You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/21 10:06:07 UTC

kylin git commit: refine coprocessor suicide

Repository: kylin
Updated Branches:
  refs/heads/yang-m1 1baa91473 -> 638f7b3c8


refine coprocessor suicide


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/638f7b3c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/638f7b3c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/638f7b3c

Branch: refs/heads/yang-m1
Commit: 638f7b3c8b59b9a70a281352fee243dc424daa12
Parents: 1baa914
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Apr 21 16:05:31 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Apr 21 16:05:31 2016 +0800

----------------------------------------------------------------------
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  2 ++
 .../coprocessor/endpoint/CubeVisitService.java  | 35 ++++++++++++++++----
 2 files changed, 30 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/638f7b3c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index cf39641..053d99e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -122,6 +122,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
             this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
             this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
+
+            this.timeout *= 1.1;//allow for some delay 
             logger.info("Timeout for ExpectedSizeIterator is " + this.timeout);
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/638f7b3c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index f6476f8..6058768 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -106,13 +106,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
         @Override
         public boolean hasNext() {
-            if (counter++ % 1000 == 1) {
+            if (counter % 1000 == 1) {
                 if (System.currentTimeMillis() - startTime > timeout) {
                     normalComplete.setValue(false);
+                    logger.error("scanner aborted because timeout");
                     return false;
                 }
             }
 
+            if (counter % 100000 == 1) {
+                logger.info("Scanned " + counter + " rows.");
+            }
+
+            counter++;
+
             return !nextOne.isEmpty();
         }
 
@@ -218,10 +225,10 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 appendProfileInfo(sb, "scanned " + counter);
             }
 
-            final MutableBoolean normalComplete = new MutableBoolean(true);
-            final long startTime = this.serviceStartTime;//request.getStartTime();
-            final long timeout = (long) (request.getTimeout() * 0.95);
-            InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner, normalComplete, startTime, timeout);
+            final MutableBoolean scanNormalComplete = new MutableBoolean(true);
+            final long startTime = this.serviceStartTime;
+            final long timeout = request.getTimeout();
+            InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner, scanNormalComplete, startTime, timeout);
 
             if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
                 scanReq.setAggrCacheGB(0); // disable mem check if so told
@@ -239,6 +246,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
             int finalRowCount = 0;
             for (GTRecord oneRecord : finalScanner) {
+
+                if (!scanNormalComplete.booleanValue()) {
+                    logger.error("aggregate iterator aborted because input iterator aborts");
+                    break;
+                }
+
+                if (finalRowCount % 1000 == 1) {
+                    if (System.currentTimeMillis() - startTime > timeout) {
+                        logger.error("aggregate iterator aborted because timeout");
+                        break;
+                    }
+                }
+
+                
                 buffer.clear();
                 oneRecord.exportColumns(scanReq.getColumns(), buffer);
                 buffer.flip();
@@ -251,7 +272,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
             //outputStream.close() is not necessary
             byte[] compressedAllRows;
-            if (normalComplete.booleanValue()) {
+            if (scanNormalComplete.booleanValue()) {
                 allRows = outputStream.toByteArray();
             } else {
                 allRows = new byte[0];
@@ -282,7 +303,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                             setFreeSwapSpaceSize(freeSwapSpaceSize).//
                             setHostname(InetAddress.getLocalHost().getHostName()).// 
                             setEtcMsg(sb.toString()).//
-                            setNormalComplete(normalComplete.booleanValue() ? 1 : 0).build())
+                            setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build())
                     .//
                     build());