You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/02 08:38:34 UTC

[12/32] kylin git commit: KYLIN-2079 add explicit configuration knob for coprocessor timeout

KYLIN-2079 add explicit configuration knob for coprocessor timeout


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6bdd4f38
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6bdd4f38
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6bdd4f38

Branch: refs/heads/v1.6.0-rc1-hbase1.x
Commit: 6bdd4f38335b3f4f37fd1e263f667d3daf5bfd73
Parents: 11c2c69
Author: gaodayue <ga...@meituan.com>
Authored: Tue Oct 11 15:11:38 2016 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Sun Oct 30 22:32:45 2016 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |  4 ++
 .../apache/kylin/common/KylinConfigBase.java    |  8 +--
 .../kylin/gridtable/StorageSideBehavior.java    |  2 +-
 .../apache/kylin/query/ITKylinQueryTest.java    |  4 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  6 +-
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     | 24 +++++++
 .../hbase/cube/v2/ExpectedSizeIterator.java     | 73 ++++++--------------
 7 files changed, 60 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 33a4e76..24e8f50 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -137,6 +137,10 @@ kylin.query.mem.budget=3221225472
 
 kylin.query.coprocessor.mem.gb=3
 
+# the default coprocessor timeout is (hbase.rpc.timeout * 0.9) / 1000 seconds,
+# you can set it to a smaller value. 0 means use default.
+# kylin.query.coprocessor.timeout.seconds=0
+
 # Enable/disable ACL check for cube query
 kylin.query.security.enabled=true
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f49127b..26c280b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -485,10 +485,6 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
     }
 
-    public float getCubeVisitTimeoutTimes() {
-        return Float.parseFloat(getOptional("kylin.query.cube.visit.timeout.times", "1"));
-    }
-
     public int getBadQueryStackTraceDepth() {
         return Integer.parseInt(getOptional("kylin.query.badquery.stacktrace.depth", "10"));
     }
@@ -541,6 +537,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Double.parseDouble(this.getOptional("kylin.query.coprocessor.mem.gb", "3.0"));
     }
 
+    public int getQueryCoprocessorTimeoutSeconds() {
+        return Integer.parseInt(this.getOptional("kylin.query.coprocessor.timeout.seconds", "0"));
+    }
+
     public boolean isQuerySecureEnabled() {
         return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "true"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
index b01ac3f..d87b41b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
@@ -26,7 +26,7 @@ public enum StorageSideBehavior {
     SCAN_FILTER, //only scan+filter used,used for profiling filter speed.  Will not return any result
     SCAN_FILTER_AGGR, //aggregate the result.  Will return results
     SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results
-    SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY; // on each scan operation, delay for 10s to simulate slow queries, for test use
+    SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY; // on each scan operation, delay for 10ms to simulate slow queries, for test use
 
     public boolean filterToggledOn() {
         return this.ordinal() >= SCAN_FILTER.ordinal();

http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 2ec5324..61926d8 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -102,7 +102,7 @@ public class ITKylinQueryTest extends KylinTestBase {
             toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan
             BackdoorToggles.setToggles(toggles);
 
-            KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.01");//set timeout to 3s
+            KylinConfig.getInstanceFromEnv().setProperty("kylin.query.coprocessor.timeout.seconds", "3");
 
             //these two cubes has RAW measure, will disturb limit push down
             RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
@@ -115,7 +115,7 @@ public class ITKylinQueryTest extends KylinTestBase {
             RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
             RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
 
-            KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "1");//set timeout to 9s 
+            KylinConfig.getInstanceFromEnv().setProperty("kylin.query.coprocessor.timeout.seconds", "0"); // set timeout to default
             BackdoorToggles.cleanToggles();
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 4f538ae..d99f80e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -128,10 +128,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
         rawScanByteString = serializeRawScans(rawScans);
         
+        int coprocessorTimeout = getCoprocessorTimeoutMillis();
+        scanRequest.setTimeout(coprocessorTimeout);
         scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
-        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
-        scanRequest.setTimeout(epResultItr.getRpcTimeout());
         scanRequestByteString = serializeGTScanReq(scanRequest);
+
+        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout);
         
         logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index da087c9..05b34c7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -22,11 +22,14 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
@@ -45,6 +48,7 @@ import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.IGTStorage;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -274,4 +278,24 @@ public abstract class CubeHBaseRPC implements IGTStorage {
         logger.info(info.toString());
     }
 
+    protected int getCoprocessorTimeoutMillis() {
+        int configTimeout = cubeSeg.getConfig().getQueryCoprocessorTimeoutSeconds() * 1000;
+        if (configTimeout == 0) {
+            configTimeout = Integer.MAX_VALUE;
+        }
+
+        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+        int rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+        // final timeout should be smaller than rpc timeout
+        int upper = (int) (rpcTimeout * 0.9);
+
+        int timeout = Math.min(upper, configTimeout);
+        if (BackdoorToggles.getQueryTimeout() != -1) {
+            timeout = Math.min(upper, BackdoorToggles.getQueryTimeout());
+        }
+
+        logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, timeout);
+        return timeout;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index c27e5fc..2d574bd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -24,50 +24,25 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-class ExpectedSizeIterator implements Iterator<byte[]> {
-    private static final Logger logger = LoggerFactory.getLogger(ExpectedSizeIterator.class);
-
-    BlockingQueue<byte[]> queue;
+import com.google.common.base.Throwables;
 
-    int expectedSize;
-    int current = 0;
-    long rpcTimeout;
-    long timeout;
-    long timeoutTS;
-    volatile Throwable coprocException;
-
-    public ExpectedSizeIterator(int expectedSize) {
+class ExpectedSizeIterator implements Iterator<byte[]> {
+    private BlockingQueue<byte[]> queue;
+    private int expectedSize;
+    private int current = 0;
+    private int coprocessorTimeout;
+    private long deadline;
+    private volatile Throwable coprocException;
+
+    public ExpectedSizeIterator(int expectedSize, int coprocessorTimeout) {
         this.expectedSize = expectedSize;
         this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
 
-        StringBuilder sb = new StringBuilder();
-        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
-
-        this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-        this.timeout = this.rpcTimeout * hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-        sb.append("rpc timeout is " + this.rpcTimeout + " and after multiply retry times becomes " + this.timeout);
-
-        this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
-        sb.append(" after multiply kylin.query.cube.visit.timeout.times becomes " + this.timeout);
-
-        logger.info(sb.toString());
-
-        if (BackdoorToggles.getQueryTimeout() != -1) {
-            this.timeout = BackdoorToggles.getQueryTimeout();
-            logger.info("rpc timeout is overwritten to " + this.timeout);
-        }
-
-        this.timeoutTS = System.currentTimeMillis() + 2 * this.timeout;//longer timeout than coprocessor so that query thread will not timeout faster than coprocessor
+        this.coprocessorTimeout = coprocessorTimeout;
+        //longer timeout than coprocessor so that query thread will not timeout faster than coprocessor
+        this.deadline = System.currentTimeMillis() + coprocessorTimeout * 10;
     }
 
     @Override
@@ -84,22 +59,20 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
             current++;
             byte[] ret = null;
 
-            while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) {
-                ret = queue.poll(10000, TimeUnit.MILLISECONDS);
+            while (ret == null && coprocException == null && deadline > System.currentTimeMillis()) {
+                ret = queue.poll(1000, TimeUnit.MILLISECONDS);
             }
 
             if (coprocException != null) {
-                if (coprocException instanceof GTScanSelfTerminatedException)
-                    throw (GTScanSelfTerminatedException) coprocException;
-                else
-                    throw new RuntimeException("Error in coprocessor", coprocException);
+                throw Throwables.propagate(coprocException);
+            }
 
-            } else if (ret == null) {
+            if (ret == null) {
                 throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + //
-                        GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + timeout + ") cannot support this many scans?");
-            } else {
-                return ret;
+                        GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?");
             }
+
+            return ret;
         } catch (InterruptedException e) {
             throw new RuntimeException("Error when waiting queue", e);
         }
@@ -118,10 +91,6 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
         }
     }
 
-    public long getRpcTimeout() {
-        return this.timeout;
-    }
-
     public void notifyCoprocException(Throwable ex) {
         coprocException = ex;
     }