You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/06/02 08:47:40 UTC

kylin git commit: KYLIN-1757 Return coprocessor result as soon as region result is back

Repository: kylin
Updated Branches:
  refs/heads/master 69157bb12 -> 2d6cd529d


KYLIN-1757 Return coprocessor result as soon as region result is back


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2d6cd529
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2d6cd529
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2d6cd529

Branch: refs/heads/master
Commit: 2d6cd529dab65f06a067eba4bbea16fde19e1476
Parents: 69157bb
Author: Li Yang <li...@apache.org>
Authored: Thu Jun 2 16:46:46 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Jun 2 16:46:46 2016 +0800

----------------------------------------------------------------------
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 97 +++++++++++---------
 1 file changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/2d6cd529/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 1ebc736..0c6a053 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -23,7 +23,6 @@ import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -59,8 +58,11 @@ import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -338,37 +340,58 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 @Override
                 public void run() {
 
-                    String logHeader = "<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> ";
+                    final String logHeader = "<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> ";
+                    final boolean[] abnormalFinish = new boolean[1];
 
-                    Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
                     try {
-                        HTableInterface htable = conn.getTable(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
-                        results = getResults(builder.build(), htable, epRange.getFirst(), epRange.getSecond());
+                        HTableInterface table = conn.getTable(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
+
+                        final CubeVisitRequest request = builder.build();
+                        final byte[] startKey = epRange.getFirst();
+                        final byte[] endKey = epRange.getSecond();
+
+                        table.coprocessorService(CubeVisitService.class, startKey, endKey, //
+                                new Batch.Call<CubeVisitService, CubeVisitResponse>() {
+                                    public CubeVisitResponse call(CubeVisitService rowsService) throws IOException {
+                                        ServerRpcController controller = new ServerRpcController();
+                                        BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
+                                        rowsService.visitCube(controller, request, rpcCallback);
+                                        CubeVisitResponse response = rpcCallback.get();
+                                        if (controller.failedOnException()) {
+                                            throw controller.getFailedOn();
+                                        }
+                                        return response;
+                                    }
+                                }, new Batch.Callback<CubeVisitResponse>() {
+                                    @Override
+                                    public void update(byte[] region, byte[] row, CubeVisitResponse result) {
+                                        if (region == null)
+                                            return;
+
+                                        totalScannedCount.addAndGet(result.getStats().getScannedRowCount());
+                                        logger.info(logHeader + getStatsString(region, result));
+
+                                        if (result.getStats().getNormalComplete() != 1) {
+                                            abnormalFinish[0] = true;
+                                            return;
+                                        }
+                                        try {
+                                            if (compressionResult) {
+                                                epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
+                                            } else {
+                                                epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+                                            }
+                                        } catch (IOException | DataFormatException e) {
+                                            throw new RuntimeException(logHeader + "Error when decompressing", e);
+                                        }
+                                    }
+                                });
+
                     } catch (Throwable throwable) {
                         throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable);
                     }
 
-                    boolean abnormalFinish = false;
-                    for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
-                        totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
-                        logger.info(logHeader + getStatsString(result));
-
-                        if (result.getValue().getStats().getNormalComplete() != 1) {
-                            abnormalFinish = true;
-                        } else {
-                            try {
-                                if (compressionResult) {
-                                    epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
-                                } else {
-                                    epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()));
-                                }
-                            } catch (IOException | DataFormatException e) {
-                                throw new RuntimeException(logHeader + "Error when decompressing", e);
-                            }
-                        }
-                    }
-
-                    if (abnormalFinish) {
+                    if (abnormalFinish[0]) {
                         throw new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
                     }
                 }
@@ -378,10 +401,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
     }
 
-    private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) {
+    private String getStatsString(byte[] region, CubeVisitResponse result) {
         StringBuilder sb = new StringBuilder();
-        Stats stats = result.getValue().getStats();
-        sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(result.getKey())).append(" on host: ").append(stats.getHostname()).append(".");
+        Stats stats = result.getStats();
+        sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append(".");
         sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
         sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
         sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
@@ -392,20 +415,4 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     }
 
-    private Map<byte[], CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable {
-        Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() {
-            public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
-                BlockingRpcCallback<CubeVisitProtos.CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
-                rowsService.visitCube(controller, request, rpcCallback);
-                CubeVisitProtos.CubeVisitResponse response = rpcCallback.get();
-                if (controller.failedOnException()) {
-                    throw controller.getFailedOn();
-                }
-                return response;
-            }
-        });
-
-        return results;
-    }
 }