You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/06/22 03:26:41 UTC

kylin git commit: KYLIN-1645 Report coproc exception back to the query thread

Repository: kylin
Updated Branches:
  refs/heads/master 57727187c -> a80a0f734


KYLIN-1645 Report coproc exception back to the query thread


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a80a0f73
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a80a0f73
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a80a0f73

Branch: refs/heads/master
Commit: a80a0f7346247af607bb17f31845b1ecb0697330
Parents: 5772718
Author: Li Yang <li...@apache.org>
Authored: Wed Jun 22 11:26:26 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Wed Jun 22 11:26:26 2016 +0800

----------------------------------------------------------------------
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 29 ++++++++++++++------
 1 file changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a80a0f73/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 9cc1bee..bacf6e2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -86,6 +86,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         int current = 0;
         long timeout;
         long timeoutTS;
+        volatile Throwable coprocException;
 
         public ExpectedSizeIterator(int expectedSize) {
             this.expectedSize = expectedSize;
@@ -117,19 +118,21 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             }
             try {
                 current++;
-                long tsRemaining = this.timeoutTS - System.currentTimeMillis();
-                if (tsRemaining < 0) {
-                    throw new RuntimeException("Timeout visiting cube!");
+                byte[] ret = null;
+                
+                while (ret == null && coprocException == null && timeoutTS - System.currentTimeMillis() > 0) {
+                    ret = queue.poll(5000, TimeUnit.MILLISECONDS);
                 }
 
-                byte[] ret = queue.poll(tsRemaining, TimeUnit.MILLISECONDS);
-                if (ret == null) {
+                if (coprocException != null) {
+                    throw new RuntimeException("Error in coprocessor", coprocException);
+                } else if (ret == null) {
                     throw new RuntimeException("Timeout visiting cube!");
                 } else {
                     return ret;
                 }
             } catch (InterruptedException e) {
-                throw new RuntimeException("error when waiting queue", e);
+                throw new RuntimeException("Error when waiting queue", e);
             }
         }
 
@@ -149,6 +152,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         public long getTimeout() {
             return timeout;
         }
+
+        public void notifyCoprocException(Throwable ex) {
+            coprocException = ex;
+        }
     }
 
     static class EndpointResultsAsGTScanner implements IGTScanner {
@@ -389,12 +396,16 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                                     }
                                 });
 
-                    } catch (Throwable throwable) {
-                        throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable);
+                    } catch (Throwable ex) {
+                        logger.error(logHeader + "Error when visiting cubes by endpoint", ex);
+                        epResultItr.notifyCoprocException(ex);
+                        return;
                     }
 
                     if (abnormalFinish[0]) {
-                        throw new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
+                        Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
+                        epResultItr.notifyCoprocException(ex);
+                        return;
                     }
                 }
             });