You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/06/02 08:47:40 UTC
kylin git commit: KYLIN-1757 Return coprocessor result as soon as
region result is back
Repository: kylin
Updated Branches:
refs/heads/master 69157bb12 -> 2d6cd529d
KYLIN-1757 Return coprocessor result as soon as region result is back
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2d6cd529
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2d6cd529
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2d6cd529
Branch: refs/heads/master
Commit: 2d6cd529dab65f06a067eba4bbea16fde19e1476
Parents: 69157bb
Author: Li Yang <li...@apache.org>
Authored: Thu Jun 2 16:46:46 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Jun 2 16:46:46 2016 +0800
----------------------------------------------------------------------
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 97 +++++++++++---------
1 file changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/2d6cd529/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 1ebc736..0c6a053 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -23,7 +23,6 @@ import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -59,8 +58,11 @@ import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -338,37 +340,58 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@Override
public void run() {
- String logHeader = "<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> ";
+ final String logHeader = "<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> ";
+ final boolean[] abnormalFinish = new boolean[1];
- Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
try {
- HTableInterface htable = conn.getTable(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
- results = getResults(builder.build(), htable, epRange.getFirst(), epRange.getSecond());
+ HTableInterface table = conn.getTable(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
+
+ final CubeVisitRequest request = builder.build();
+ final byte[] startKey = epRange.getFirst();
+ final byte[] endKey = epRange.getSecond();
+
+ table.coprocessorService(CubeVisitService.class, startKey, endKey, //
+ new Batch.Call<CubeVisitService, CubeVisitResponse>() {
+ public CubeVisitResponse call(CubeVisitService rowsService) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
+ rowsService.visitCube(controller, request, rpcCallback);
+ CubeVisitResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ return response;
+ }
+ }, new Batch.Callback<CubeVisitResponse>() {
+ @Override
+ public void update(byte[] region, byte[] row, CubeVisitResponse result) {
+ if (region == null)
+ return;
+
+ totalScannedCount.addAndGet(result.getStats().getScannedRowCount());
+ logger.info(logHeader + getStatsString(region, result));
+
+ if (result.getStats().getNormalComplete() != 1) {
+ abnormalFinish[0] = true;
+ return;
+ }
+ try {
+ if (compressionResult) {
+ epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
+ } else {
+ epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+ }
+ } catch (IOException | DataFormatException e) {
+ throw new RuntimeException(logHeader + "Error when decompressing", e);
+ }
+ }
+ });
+
} catch (Throwable throwable) {
throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable);
}
- boolean abnormalFinish = false;
- for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
- totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
- logger.info(logHeader + getStatsString(result));
-
- if (result.getValue().getStats().getNormalComplete() != 1) {
- abnormalFinish = true;
- } else {
- try {
- if (compressionResult) {
- epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
- } else {
- epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()));
- }
- } catch (IOException | DataFormatException e) {
- throw new RuntimeException(logHeader + "Error when decompressing", e);
- }
- }
- }
-
- if (abnormalFinish) {
+ if (abnormalFinish[0]) {
throw new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
}
}
@@ -378,10 +401,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
}
- private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) {
+ private String getStatsString(byte[] region, CubeVisitResponse result) {
StringBuilder sb = new StringBuilder();
- Stats stats = result.getValue().getStats();
- sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(result.getKey())).append(" on host: ").append(stats.getHostname()).append(".");
+ Stats stats = result.getStats();
+ sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append(".");
sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
@@ -392,20 +415,4 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
- private Map<byte[], CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable {
- Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() {
- public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<CubeVisitProtos.CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
- rowsService.visitCube(controller, request, rpcCallback);
- CubeVisitProtos.CubeVisitResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
- }
- return response;
- }
- });
-
- return results;
- }
}