You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/06/24 07:05:03 UTC
[25/50] kylin git commit: KYLIN-1645 Report coproc exception back to
the query thread
KYLIN-1645 Report coproc exception back to the query thread
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a80a0f73
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a80a0f73
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a80a0f73
Branch: refs/heads/stream_m1
Commit: a80a0f7346247af607bb17f31845b1ecb0697330
Parents: 5772718
Author: Li Yang <li...@apache.org>
Authored: Wed Jun 22 11:26:26 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Wed Jun 22 11:26:26 2016 +0800
----------------------------------------------------------------------
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 29 ++++++++++++++------
1 file changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/a80a0f73/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 9cc1bee..bacf6e2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -86,6 +86,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
int current = 0;
long timeout;
long timeoutTS;
+ volatile Throwable coprocException;
public ExpectedSizeIterator(int expectedSize) {
this.expectedSize = expectedSize;
@@ -117,19 +118,21 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
try {
current++;
- long tsRemaining = this.timeoutTS - System.currentTimeMillis();
- if (tsRemaining < 0) {
- throw new RuntimeException("Timeout visiting cube!");
+ byte[] ret = null;
+
+ while (ret == null && coprocException == null && timeoutTS - System.currentTimeMillis() > 0) {
+ ret = queue.poll(5000, TimeUnit.MILLISECONDS);
}
- byte[] ret = queue.poll(tsRemaining, TimeUnit.MILLISECONDS);
- if (ret == null) {
+ if (coprocException != null) {
+ throw new RuntimeException("Error in coprocessor", coprocException);
+ } else if (ret == null) {
throw new RuntimeException("Timeout visiting cube!");
} else {
return ret;
}
} catch (InterruptedException e) {
- throw new RuntimeException("error when waiting queue", e);
+ throw new RuntimeException("Error when waiting queue", e);
}
}
@@ -149,6 +152,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
public long getTimeout() {
return timeout;
}
+
+ public void notifyCoprocException(Throwable ex) {
+ coprocException = ex;
+ }
}
static class EndpointResultsAsGTScanner implements IGTScanner {
@@ -389,12 +396,16 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
});
- } catch (Throwable throwable) {
- throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable);
+ } catch (Throwable ex) {
+ logger.error(logHeader + "Error when visiting cubes by endpoint", ex);
+ epResultItr.notifyCoprocException(ex);
+ return;
}
if (abnormalFinish[0]) {
- throw new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
+ Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
+ epResultItr.notifyCoprocException(ex);
+ return;
}
}
});