You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/02/03 07:34:31 UTC

kylin git commit: KYLIN-1391 quicker and better response to v2 storage engine's rpc timeout exception

Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 2528111b4 -> 36e26528e


KYLIN-1391 quicker and better response to v2 storage engine's rpc timeout exception


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/36e26528
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/36e26528
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/36e26528

Branch: refs/heads/2.x-staging
Commit: 36e26528e9f2c45d57c6d974d52b2cf1af8ac315
Parents: 2528111
Author: honma <ho...@ebay.com>
Authored: Tue Feb 2 11:36:23 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Feb 3 14:34:12 2016 +0800

----------------------------------------------------------------------
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 46 ++++++++++++++++++--
 1 file changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/36e26528/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index d6ef16c..f22964f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -25,8 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
@@ -34,6 +37,7 @@ import java.util.zip.DataFormatException;
 import javax.annotation.Nullable;
 
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -48,6 +52,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
@@ -70,17 +75,52 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     public static final Logger logger = LoggerFactory.getLogger(CubeHBaseEndpointRPC.class);
 
-    private static ExecutorService executorService = Executors.newCachedThreadPool();
+    private static ExecutorService executorService = newLoggableCachedThreadPool();
+
+    public static ExecutorService newLoggableCachedThreadPool() {
+        return new LoggableCachedThreadPool();
+    }
+
+    public static class LoggableCachedThreadPool extends ThreadPoolExecutor {
+
+        public LoggableCachedThreadPool() {
+            super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+        }
+
+        @Override
+        protected void afterExecute(Runnable r, Throwable t) {
+            super.afterExecute(r, t);
+            if (t == null && r instanceof Future<?>) {
+                try {
+                    ((Future<?>) r).get();
+                } catch (ExecutionException ee) {
+                    logger.error("Execution exception when running task in " + Thread.currentThread().getName());
+                    t = ee.getCause();
+                } catch (InterruptedException ie) {
+                    logger.error("Thread interrupted: ");
+                    Thread.currentThread().interrupt(); // ignore/reset
+                } catch (Throwable throwable) {
+                    t = throwable;
+                }
+            }
+            if (t != null) {
+                logger.error("Caught exception in thread " + Thread.currentThread().getName() + ": ", t);
+            }
+        }
+    }
 
     static class ExpectedSizeIterator implements Iterator<byte[]> {
 
         int expectedSize;
         int current = 0;
         BlockingQueue<byte[]> queue;
+        long timeout;
 
         public ExpectedSizeIterator(int expectedSize) {
             this.expectedSize = expectedSize;
             this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
+            this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+            logger.info("Timeout for ExpectedSizeIterator is " + this.timeout);
         }
 
         @Override
@@ -95,7 +135,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             }
             try {
                 current++;
-                return queue.poll(1, TimeUnit.HOURS);
+                return queue.poll(timeout, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 throw new RuntimeException("error when waiting queue", e);
             }