You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/10/31 01:00:32 UTC

[2/4] incubator-kylin git commit: half way

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e40328a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
index d4e8529..40e9b28 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
@@ -1,88 +1,89 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-
-/**
- * @author yangli9
- * 
- */
-public class PingHBaseCLI {
-
-    public static void main(String[] args) throws IOException {
-        String hbaseTable = args[0];
-
-        System.out.println("Hello friend.");
-
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        if (User.isHBaseSecurityEnabled(hconf)) {
-            try {
-                System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
-                TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser());
-            } catch (InterruptedException e) {
-                System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
-            }
-        }
-
-        Scan scan = new Scan();
-        int limit = 20;
-
-        HConnection conn = null;
-        HTableInterface table = null;
-        ResultScanner scanner = null;
-        try {
-            conn = HConnectionManager.createConnection(hconf);
-            table = conn.getTable(hbaseTable);
-            scanner = table.getScanner(scan);
-            int count = 0;
-            for (Result r : scanner) {
-                byte[] rowkey = r.getRow();
-                System.out.println(Bytes.toStringBinary(rowkey));
-                count++;
-                if (count == limit)
-                    break;
-            }
-        } finally {
-            if (scanner != null) {
-                scanner.close();
-            }
-            if (table != null) {
-                table.close();
-            }
-            if (conn != null) {
-                conn.close();
-            }
-        }
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class PingHBaseCLI {
+
+    public static void main(String[] args) throws IOException {
+        String hbaseTable = args[0];
+
+        System.out.println("Hello friend.");
+
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+        if (User.isHBaseSecurityEnabled(hconf)) {
+            try {
+                System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
+                TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser());
+            } catch (InterruptedException e) {
+                System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
+            }
+        }
+
+        Scan scan = new Scan();
+        int limit = 20;
+
+        Connection conn = null;
+        Table table = null;
+        ResultScanner scanner = null;
+        try {
+            conn = ConnectionFactory.createConnection(hconf);
+            table = conn.getTable(TableName.valueOf(hbaseTable));
+            scanner = table.getScanner(scan);
+            int count = 0;
+            for (Result r : scanner) {
+                byte[] rowkey = r.getRow();
+                System.out.println(Bytes.toStringBinary(rowkey));
+                count++;
+                if (count == limit)
+                    break;
+            }
+        } finally {
+            if (scanner != null) {
+                scanner.close();
+            }
+            if (table != null) {
+                table.close();
+            }
+            if (conn != null) {
+                conn.close();
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e40328a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index a115753..e950e5b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.kv.RowValueDecoder;
@@ -52,7 +52,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
     private ITupleIterator segmentIterator;
     private int scanCount;
 
-    public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+    public SerializedHBaseTupleIterator(Connection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
 
         this.context = context;
         int limit = context.getLimit();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e40328a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index 7a0ab15..87f9354 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -26,8 +26,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -79,14 +80,14 @@ public class EndpointTupleIterator implements ITupleIterator {
 
     Iterator<List<IIProtos.IIResponse.IIRow>> regionResponsesIterator = null;
     ITupleIterator tupleIterator = null;
-    HTableInterface table = null;
+    Table table = null;
 
     int rowsInAllMetric = 0;
 
-    public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn) throws Throwable {
+    public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, Connection conn) throws Throwable {
 
         String tableName = segment.getStorageLocationIdentifier();
-        table = conn.getTable(tableName);
+        table = conn.getTable(TableName.valueOf(tableName));
         factTableName = segment.getIIDesc().getFactTableName();
 
         if (rootFilter == null) {
@@ -212,7 +213,7 @@ public class EndpointTupleIterator implements ITupleIterator {
     }
 
     //TODO : async callback
-    private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
+    private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, Table table) throws Throwable {
         Map<byte[], List<IIProtos.IIResponse.IIRow>> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, List<IIProtos.IIResponse.IIRow>>() {
             public List<IIProtos.IIResponse.IIRow> call(IIProtos.RowsService rowsService) throws IOException {
                 ServerRpcController controller = new ServerRpcController();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e40328a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index f0f7ed5..fa2a7c1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -23,9 +23,9 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
@@ -58,7 +58,7 @@ public class ObserverEnabler {
     static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
 
     public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, //
-            Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException {
+            Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException {
 
         if (context.isCoprocessorEnabled() == false) {
             return table.getScanner(scan);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e40328a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
index f7fcef1..481aab0 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
@@ -1,115 +1,115 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
-
-    IIInstance ii;
-    IISegment seg;
-    HConnection hconn;
-
-    TableRecordInfo info;
-
-    @Before
-    public void setup() throws Exception {
-        this.createTestMetadata();
-
-        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii");
-        this.seg = ii.getFirstSegment();
-
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        hconn = HConnectionManager.createConnection(hconf);
-
-        this.info = new TableRecordInfo(seg);
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Test
-    public void testLoad() throws Exception {
-
-        String tableName = seg.getStorageLocationIdentifier();
-        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
-
-        List<Slice> slices = Lists.newArrayList();
-        HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
-        try {
-            for (Slice slice : codec.decodeKeyValue(kvIterator)) {
-                slices.add(slice);
-            }
-        } finally {
-            kvIterator.close();
-        }
-
-        List<TableRecord> records = iterateRecords(slices);
-        dump(records);
-        System.out.println(records.size() + " records");
-    }
-
-    private List<TableRecord> iterateRecords(List<Slice> slices) {
-        List<TableRecord> records = Lists.newArrayList();
-        for (Slice slice : slices) {
-            for (RawTableRecord rec : slice) {
-                records.add(new TableRecord((RawTableRecord) rec.clone(), info));
-            }
-        }
-        return records;
-    }
-
-    private void dump(Iterable<TableRecord> records) {
-        for (TableRecord rec : records) {
-            System.out.println(rec.toString());
-
-            byte[] x = rec.getBytes();
-            String y = BytesUtil.toReadableText(x);
-            System.out.println(y);
-            System.out.println();
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
+
+    IIInstance ii;
+    IISegment seg;
+    Connection hconn;
+
+    TableRecordInfo info;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+
+        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii");
+        this.seg = ii.getFirstSegment();
+
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+        hconn = HConnectionManager.createConnection(hconf);
+
+        this.info = new TableRecordInfo(seg);
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testLoad() throws Exception {
+
+        String tableName = seg.getStorageLocationIdentifier();
+        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+
+        List<Slice> slices = Lists.newArrayList();
+        HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
+        try {
+            for (Slice slice : codec.decodeKeyValue(kvIterator)) {
+                slices.add(slice);
+            }
+        } finally {
+            kvIterator.close();
+        }
+
+        List<TableRecord> records = iterateRecords(slices);
+        dump(records);
+        System.out.println(records.size() + " records");
+    }
+
+    private List<TableRecord> iterateRecords(List<Slice> slices) {
+        List<TableRecord> records = Lists.newArrayList();
+        for (Slice slice : slices) {
+            for (RawTableRecord rec : slice) {
+                records.add(new TableRecord((RawTableRecord) rec.clone(), info));
+            }
+        }
+        return records;
+    }
+
+    private void dump(Iterable<TableRecord> records) {
+        for (TableRecord rec : records) {
+            System.out.println(rec.toString());
+
+            byte[] x = rec.getBytes();
+            String y = BytesUtil.toReadableText(x);
+            System.out.println(y);
+            System.out.println();
+        }
+    }
+
+}