You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/21 10:06:07 UTC
kylin git commit: refine coprocessor suicide
Repository: kylin
Updated Branches:
refs/heads/yang-m1 1baa91473 -> 638f7b3c8
refine coprocessor suicide
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/638f7b3c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/638f7b3c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/638f7b3c
Branch: refs/heads/yang-m1
Commit: 638f7b3c8b59b9a70a281352fee243dc424daa12
Parents: 1baa914
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Apr 21 16:05:31 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Apr 21 16:05:31 2016 +0800
----------------------------------------------------------------------
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 ++
.../coprocessor/endpoint/CubeVisitService.java | 35 ++++++++++++++++----
2 files changed, 30 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/638f7b3c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index cf39641..053d99e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -122,6 +122,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
+
+ this.timeout *= 1.1;//allow for some delay
logger.info("Timeout for ExpectedSizeIterator is " + this.timeout);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/638f7b3c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index f6476f8..6058768 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -106,13 +106,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
@Override
public boolean hasNext() {
- if (counter++ % 1000 == 1) {
+ if (counter % 1000 == 1) {
if (System.currentTimeMillis() - startTime > timeout) {
normalComplete.setValue(false);
+ logger.error("scanner aborted because timeout");
return false;
}
}
+ if (counter % 100000 == 1) {
+ logger.info("Scanned " + counter + " rows.");
+ }
+
+ counter++;
+
return !nextOne.isEmpty();
}
@@ -218,10 +225,10 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
appendProfileInfo(sb, "scanned " + counter);
}
- final MutableBoolean normalComplete = new MutableBoolean(true);
- final long startTime = this.serviceStartTime;//request.getStartTime();
- final long timeout = (long) (request.getTimeout() * 0.95);
- InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner, normalComplete, startTime, timeout);
+ final MutableBoolean scanNormalComplete = new MutableBoolean(true);
+ final long startTime = this.serviceStartTime;
+ final long timeout = request.getTimeout();
+ InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner, scanNormalComplete, startTime, timeout);
if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
scanReq.setAggrCacheGB(0); // disable mem check if so told
@@ -239,6 +246,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
int finalRowCount = 0;
for (GTRecord oneRecord : finalScanner) {
+
+ if (!scanNormalComplete.booleanValue()) {
+ logger.error("aggregate iterator aborted because input iterator aborts");
+ break;
+ }
+
+ if (finalRowCount % 1000 == 1) {
+ if (System.currentTimeMillis() - startTime > timeout) {
+ logger.error("aggregate iterator aborted because timeout");
+ break;
+ }
+ }
+
+
buffer.clear();
oneRecord.exportColumns(scanReq.getColumns(), buffer);
buffer.flip();
@@ -251,7 +272,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
//outputStream.close() is not necessary
byte[] compressedAllRows;
- if (normalComplete.booleanValue()) {
+ if (scanNormalComplete.booleanValue()) {
allRows = outputStream.toByteArray();
} else {
allRows = new byte[0];
@@ -282,7 +303,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
setFreeSwapSpaceSize(freeSwapSpaceSize).//
setHostname(InetAddress.getLocalHost().getHostName()).//
setEtcMsg(sb.toString()).//
- setNormalComplete(normalComplete.booleanValue() ? 1 : 0).build())
+ setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build())
.//
build());