You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/02/03 07:34:31 UTC
kylin git commit: KYLIN-1391 quicker and better response to v2
storage engine's rpc timeout exception
Repository: kylin
Updated Branches:
refs/heads/2.x-staging 2528111b4 -> 36e26528e
KYLIN-1391 quicker and better response to v2 storage engine's rpc timeout exception
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/36e26528
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/36e26528
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/36e26528
Branch: refs/heads/2.x-staging
Commit: 36e26528e9f2c45d57c6d974d52b2cf1af8ac315
Parents: 2528111
Author: honma <ho...@ebay.com>
Authored: Tue Feb 2 11:36:23 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Feb 3 14:34:12 2016 +0800
----------------------------------------------------------------------
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 46 ++++++++++++++++++--
1 file changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/36e26528/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index d6ef16c..f22964f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -25,8 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
@@ -34,6 +37,7 @@ import java.util.zip.DataFormatException;
import javax.annotation.Nullable;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -48,6 +52,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
@@ -70,17 +75,52 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
public static final Logger logger = LoggerFactory.getLogger(CubeHBaseEndpointRPC.class);
- private static ExecutorService executorService = Executors.newCachedThreadPool();
+ private static ExecutorService executorService = newLoggableCachedThreadPool();
+
+ public static ExecutorService newLoggableCachedThreadPool() {
+ return new LoggableCachedThreadPool();
+ }
+
+ public static class LoggableCachedThreadPool extends ThreadPoolExecutor {
+
+ public LoggableCachedThreadPool() {
+ super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ if (t == null && r instanceof Future<?>) {
+ try {
+ ((Future<?>) r).get();
+ } catch (ExecutionException ee) {
+ logger.error("Execution exception when running task in " + Thread.currentThread().getName());
+ t = ee.getCause();
+ } catch (InterruptedException ie) {
+ logger.error("Thread interrupted: ");
+ Thread.currentThread().interrupt(); // ignore/reset
+ } catch (Throwable throwable) {
+ t = throwable;
+ }
+ }
+ if (t != null) {
+ logger.error("Caught exception in thread " + Thread.currentThread().getName() + ": ", t);
+ }
+ }
+ }
static class ExpectedSizeIterator implements Iterator<byte[]> {
int expectedSize;
int current = 0;
BlockingQueue<byte[]> queue;
+ long timeout;
public ExpectedSizeIterator(int expectedSize) {
this.expectedSize = expectedSize;
this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
+ this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ logger.info("Timeout for ExpectedSizeIterator is " + this.timeout);
}
@Override
@@ -95,7 +135,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
try {
current++;
- return queue.poll(1, TimeUnit.HOURS);
+ return queue.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("error when waiting queue", e);
}