You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/03 11:27:39 UTC
[2/4] incubator-kylin git commit: KYLIN-920 & KYLIN-782 Upgrade to
HBase 1.1 (with help from murkrishn )
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
index d4e8529..4a9c574 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
@@ -1,88 +1,91 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-
-/**
- * @author yangli9
- *
- */
-public class PingHBaseCLI {
-
- public static void main(String[] args) throws IOException {
- String hbaseTable = args[0];
-
- System.out.println("Hello friend.");
-
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- if (User.isHBaseSecurityEnabled(hconf)) {
- try {
- System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
- TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser());
- } catch (InterruptedException e) {
- System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
- }
- }
-
- Scan scan = new Scan();
- int limit = 20;
-
- HConnection conn = null;
- HTableInterface table = null;
- ResultScanner scanner = null;
- try {
- conn = HConnectionManager.createConnection(hconf);
- table = conn.getTable(hbaseTable);
- scanner = table.getScanner(scan);
- int count = 0;
- for (Result r : scanner) {
- byte[] rowkey = r.getRow();
- System.out.println(Bytes.toStringBinary(rowkey));
- count++;
- if (count == limit)
- break;
- }
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- if (table != null) {
- table.close();
- }
- if (conn != null) {
- conn.close();
- }
- }
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+
+/**
+ * @author yangli9
+ *
+ */
+public class PingHBaseCLI {
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ String hbaseTable = args[0];
+
+ System.out.println("Hello friend.");
+
+ Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+ if (User.isHBaseSecurityEnabled(hconf)) {
+ Connection conn = ConnectionFactory.createConnection(hconf);
+ try {
+ UserProvider userProvider = UserProvider.instantiate(hconf);
+ TokenUtil.obtainAndCacheToken(conn, userProvider.create(UserGroupInformation.getCurrentUser()));
+ } finally {
+ conn.close();
+ }
+ }
+
+ Scan scan = new Scan();
+ int limit = 20;
+
+ Connection conn = null;
+ Table table = null;
+ ResultScanner scanner = null;
+ try {
+ conn = ConnectionFactory.createConnection(hconf);
+ table = conn.getTable(TableName.valueOf(hbaseTable));
+ scanner = table.getScanner(scan);
+ int count = 0;
+ for (Result r : scanner) {
+ byte[] rowkey = r.getRow();
+ System.out.println(Bytes.toStringBinary(rowkey));
+ count++;
+ if (count == limit)
+ break;
+ }
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ if (conn != null) {
+ conn.close();
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
index e2eeed0..a07cbe4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
/**
* @author yangli9
@@ -50,7 +51,7 @@ public class RegionScannerAdapter implements RegionScanner {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -60,11 +61,16 @@ public class RegionScannerAdapter implements RegionScanner {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@Override
+ public int getBatch() {
+ return -1;
+ }
+
+ @Override
public void close() throws IOException {
scanner.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index a115753..e950e5b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.kv.RowValueDecoder;
@@ -52,7 +52,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
private ITupleIterator segmentIterator;
private int scanCount;
- public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+ public SerializedHBaseTupleIterator(Connection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
this.context = context;
int limit = context.getLimit();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index 7a0ab15..a3ece7e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -26,8 +26,9 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -79,14 +80,14 @@ public class EndpointTupleIterator implements ITupleIterator {
Iterator<List<IIProtos.IIResponse.IIRow>> regionResponsesIterator = null;
ITupleIterator tupleIterator = null;
- HTableInterface table = null;
+ Table table = null;
int rowsInAllMetric = 0;
- public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn) throws Throwable {
+ public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, Connection conn) throws Throwable {
String tableName = segment.getStorageLocationIdentifier();
- table = conn.getTable(tableName);
+ table = conn.getTable(TableName.valueOf(tableName));
factTableName = segment.getIIDesc().getFactTableName();
if (rootFilter == null) {
@@ -212,7 +213,7 @@ public class EndpointTupleIterator implements ITupleIterator {
}
//TODO : async callback
- private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
+ private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, Table table) throws Throwable {
Map<byte[], List<IIProtos.IIResponse.IIRow>> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, List<IIProtos.IIResponse.IIRow>>() {
public List<IIProtos.IIResponse.IIRow> call(IIProtos.RowsService rowsService) throws IOException {
ServerRpcController controller = new ServerRpcController();
@@ -235,7 +236,7 @@ public class EndpointTupleIterator implements ITupleIterator {
int index = 0;
for (int i = 0; i < columns.size(); i++) {
- TblColRef column = columns.get(i);
+ // TblColRef column = columns.get(i);
// if (!dimensions.contains(column)) {
// continue;
// }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index d55361b..3c91329 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -90,7 +90,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
RegionScanner innerScanner = null;
HRegion region = null;
try {
- region = env.getRegion();
+ region = (HRegion) env.getRegion();
innerScanner = region.getScanner(buildScan());
region.startRegionOperation();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index 2cecd5c..c21ee36 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -93,7 +93,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
// FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
- HRegion region = ctxt.getEnvironment().getRegion();
+ HRegion region = (HRegion) ctxt.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (innerScanner) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index 65b616f..c1a0b15 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
@@ -93,13 +94,18 @@ public class AggregationScanner implements RegionScanner {
}
@Override
+ public int getBatch() {
+ return outerScanner.getBatch();
+ }
+
+ @Override
public boolean next(List<Cell> results) throws IOException {
return outerScanner.next(results);
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
- return outerScanner.next(result, limit);
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return outerScanner.next(result, scannerContext);
}
@Override
@@ -108,8 +114,8 @@ public class AggregationScanner implements RegionScanner {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- return outerScanner.nextRaw(result, limit);
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return outerScanner.nextRaw(result, scannerContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
index f609a5a..dea1d9e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
@@ -107,7 +108,7 @@ public class ObserverAggregationCache extends AggregationCache {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@@ -117,11 +118,16 @@ public class ObserverAggregationCache extends AggregationCache {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@Override
+ public int getBatch() {
+ return innerScanner.getBatch();
+ }
+
+ @Override
public void close() throws IOException {
// AggregateRegionObserver.LOG.info("Kylin Scanner close()");
innerScanner.close();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index f0f7ed5..fa2a7c1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -23,9 +23,9 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
@@ -58,7 +58,7 @@ public class ObserverEnabler {
static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, //
- Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException {
+ Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException {
if (context.isCoprocessorEnabled() == false) {
return table.getScanner(scan);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
index f7fcef1..50069a1 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
@@ -1,115 +1,112 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
-
- IIInstance ii;
- IISegment seg;
- HConnection hconn;
-
- TableRecordInfo info;
-
- @Before
- public void setup() throws Exception {
- this.createTestMetadata();
-
- this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii");
- this.seg = ii.getFirstSegment();
-
- Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
- hconn = HConnectionManager.createConnection(hconf);
-
- this.info = new TableRecordInfo(seg);
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-
- @Test
- public void testLoad() throws Exception {
-
- String tableName = seg.getStorageLocationIdentifier();
- IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
-
- List<Slice> slices = Lists.newArrayList();
- HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
- try {
- for (Slice slice : codec.decodeKeyValue(kvIterator)) {
- slices.add(slice);
- }
- } finally {
- kvIterator.close();
- }
-
- List<TableRecord> records = iterateRecords(slices);
- dump(records);
- System.out.println(records.size() + " records");
- }
-
- private List<TableRecord> iterateRecords(List<Slice> slices) {
- List<TableRecord> records = Lists.newArrayList();
- for (Slice slice : slices) {
- for (RawTableRecord rec : slice) {
- records.add(new TableRecord((RawTableRecord) rec.clone(), info));
- }
- }
- return records;
- }
-
- private void dump(Iterable<TableRecord> records) {
- for (TableRecord rec : records) {
- System.out.println(rec.toString());
-
- byte[] x = rec.getBytes();
- String y = BytesUtil.toReadableText(x);
- System.out.println(y);
- System.out.println();
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
+
+ IIInstance ii;
+ IISegment seg;
+ Connection hconn;
+
+ TableRecordInfo info;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+
+ this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii");
+ this.seg = ii.getFirstSegment();
+
+ this.hconn = HBaseConnection.get();
+
+ this.info = new TableRecordInfo(seg);
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testLoad() throws Exception {
+
+ String tableName = seg.getStorageLocationIdentifier();
+ IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+
+ List<Slice> slices = Lists.newArrayList();
+ HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
+ try {
+ for (Slice slice : codec.decodeKeyValue(kvIterator)) {
+ slices.add(slice);
+ }
+ } finally {
+ kvIterator.close();
+ }
+
+ List<TableRecord> records = iterateRecords(slices);
+ dump(records);
+ System.out.println(records.size() + " records");
+ }
+
+ private List<TableRecord> iterateRecords(List<Slice> slices) {
+ List<TableRecord> records = Lists.newArrayList();
+ for (Slice slice : slices) {
+ for (RawTableRecord rec : slice) {
+ records.add(new TableRecord((RawTableRecord) rec.clone(), info));
+ }
+ }
+ return records;
+ }
+
+ private void dump(Iterable<TableRecord> records) {
+ for (TableRecord rec : records) {
+ System.out.println(rec.toString());
+
+ byte[] x = rec.getBytes();
+ String y = BytesUtil.toReadableText(x);
+ System.out.println(y);
+ System.out.println();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index 0454b4c..3ace91e 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.kv.RowConstants;
@@ -223,102 +224,46 @@ public class AggregateRegionObserverTest {
this.input = cellInputs;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
- * .List)
- */
@Override
public boolean next(List<Cell> results) throws IOException {
return nextRaw(results);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
- * .List, int)
- */
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.hbase.regionserver.InternalScanner#close()
- */
@Override
public void close() throws IOException {
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
- */
@Override
public HRegionInfo getRegionInfo() {
return null;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
- */
@Override
public boolean isFilterDone() throws IOException {
return false;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
- */
@Override
public boolean reseek(byte[] row) throws IOException {
return false;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
- */
@Override
public long getMaxResultSize() {
return 0;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
- */
@Override
public long getMvccReadPoint() {
return 0;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
- * .List)
- */
@Override
public boolean nextRaw(List<Cell> result) throws IOException {
if (i < input.size()) {
@@ -328,18 +273,15 @@ public class AggregateRegionObserverTest {
return i < input.size();
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
- * .List, int)
- */
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return nextRaw(result);
}
+ @Override
+ public int getBatch() {
+ return -1;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java b/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
index d17cfa6..b1f6626 100644
--- a/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
@@ -27,7 +27,6 @@ import java.sql.SQLException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.service.HiveInterface;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
@@ -47,7 +46,7 @@ public class HiveMiniClusterTest extends HiveJDBCClientTest {
public static final File HIVE_WAREHOUSE_DIR = new File(HIVE_BASE_DIR + "/warehouse");
public static final File HIVE_TESTDATA_DIR = new File(HIVE_BASE_DIR + "/testdata");
public static final File HIVE_HADOOP_TMP_DIR = new File(HIVE_BASE_DIR + "/hadooptmp");
- protected HiveInterface client;
+ //protected HiveInterface client;
protected MiniDFSCluster miniDFS;
protected MiniMRCluster miniMR;