You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/02 08:38:34 UTC
[12/32] kylin git commit: KYLIN-2079 add explicit configuration knob
for coprocessor timeout
KYLIN-2079 add explicit configuration knob for coprocessor timeout
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6bdd4f38
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6bdd4f38
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6bdd4f38
Branch: refs/heads/v1.6.0-rc1-hbase1.x
Commit: 6bdd4f38335b3f4f37fd1e263f667d3daf5bfd73
Parents: 11c2c69
Author: gaodayue <ga...@meituan.com>
Authored: Tue Oct 11 15:11:38 2016 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Sun Oct 30 22:32:45 2016 +0800
----------------------------------------------------------------------
build/conf/kylin.properties | 4 ++
.../apache/kylin/common/KylinConfigBase.java | 8 +--
.../kylin/gridtable/StorageSideBehavior.java | 2 +-
.../apache/kylin/query/ITKylinQueryTest.java | 4 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 6 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 24 +++++++
.../hbase/cube/v2/ExpectedSizeIterator.java | 73 ++++++--------------
7 files changed, 60 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 33a4e76..24e8f50 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -137,6 +137,10 @@ kylin.query.mem.budget=3221225472
kylin.query.coprocessor.mem.gb=3
+# the default coprocessor timeout is (hbase.rpc.timeout * 0.9) / 1000 seconds,
+# you can set it to a smaller value. 0 means use default.
+# kylin.query.coprocessor.timeout.seconds=0
+
# Enable/disable ACL check for cube query
kylin.query.security.enabled=true
http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f49127b..26c280b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -485,10 +485,6 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
}
- public float getCubeVisitTimeoutTimes() {
- return Float.parseFloat(getOptional("kylin.query.cube.visit.timeout.times", "1"));
- }
-
public int getBadQueryStackTraceDepth() {
return Integer.parseInt(getOptional("kylin.query.badquery.stacktrace.depth", "10"));
}
@@ -541,6 +537,10 @@ abstract public class KylinConfigBase implements Serializable {
return Double.parseDouble(this.getOptional("kylin.query.coprocessor.mem.gb", "3.0"));
}
+ public int getQueryCoprocessorTimeoutSeconds() {
+ return Integer.parseInt(this.getOptional("kylin.query.coprocessor.timeout.seconds", "0"));
+ }
+
public boolean isQuerySecureEnabled() {
return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "true"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
index b01ac3f..d87b41b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
@@ -26,7 +26,7 @@ public enum StorageSideBehavior {
SCAN_FILTER, //only scan+filter used,used for profiling filter speed. Will not return any result
SCAN_FILTER_AGGR, //aggregate the result. Will return results
SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results
- SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY; // on each scan operation, delay for 10s to simulate slow queries, for test use
+ SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY; // on each scan operation, delay for 10ms to simulate slow queries, for test use
public boolean filterToggledOn() {
return this.ordinal() >= SCAN_FILTER.ordinal();
http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 2ec5324..61926d8 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -102,7 +102,7 @@ public class ITKylinQueryTest extends KylinTestBase {
toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan
BackdoorToggles.setToggles(toggles);
- KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.01");//set timeout to 3s
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.query.coprocessor.timeout.seconds", "3");
//these two cubes has RAW measure, will disturb limit push down
RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
@@ -115,7 +115,7 @@ public class ITKylinQueryTest extends KylinTestBase {
RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
- KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "1");//set timeout to 9s
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.query.coprocessor.timeout.seconds", "0"); // set timeout to default
BackdoorToggles.cleanToggles();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 4f538ae..d99f80e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -128,10 +128,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
rawScanByteString = serializeRawScans(rawScans);
+ int coprocessorTimeout = getCoprocessorTimeoutMillis();
+ scanRequest.setTimeout(coprocessorTimeout);
scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
- final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
- scanRequest.setTimeout(epResultItr.getRpcTimeout());
scanRequestByteString = serializeGTScanReq(scanRequest);
+
+ final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout);
logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index da087c9..05b34c7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -22,11 +22,14 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
@@ -45,6 +48,7 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.IGTStorage;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -274,4 +278,24 @@ public abstract class CubeHBaseRPC implements IGTStorage {
logger.info(info.toString());
}
+ protected int getCoprocessorTimeoutMillis() {
+ int configTimeout = cubeSeg.getConfig().getQueryCoprocessorTimeoutSeconds() * 1000;
+ if (configTimeout == 0) {
+ configTimeout = Integer.MAX_VALUE;
+ }
+
+ Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+ int rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ // final timeout should be smaller than rpc timeout
+ int upper = (int) (rpcTimeout * 0.9);
+
+ int timeout = Math.min(upper, configTimeout);
+ if (BackdoorToggles.getQueryTimeout() != -1) {
+ timeout = Math.min(upper, BackdoorToggles.getQueryTimeout());
+ }
+
+ logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, timeout);
+ return timeout;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index c27e5fc..2d574bd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -24,50 +24,25 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-class ExpectedSizeIterator implements Iterator<byte[]> {
- private static final Logger logger = LoggerFactory.getLogger(ExpectedSizeIterator.class);
-
- BlockingQueue<byte[]> queue;
+import com.google.common.base.Throwables;
- int expectedSize;
- int current = 0;
- long rpcTimeout;
- long timeout;
- long timeoutTS;
- volatile Throwable coprocException;
-
- public ExpectedSizeIterator(int expectedSize) {
+class ExpectedSizeIterator implements Iterator<byte[]> {
+ private BlockingQueue<byte[]> queue;
+ private int expectedSize;
+ private int current = 0;
+ private int coprocessorTimeout;
+ private long deadline;
+ private volatile Throwable coprocException;
+
+ public ExpectedSizeIterator(int expectedSize, int coprocessorTimeout) {
this.expectedSize = expectedSize;
this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
- StringBuilder sb = new StringBuilder();
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
-
- this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- this.timeout = this.rpcTimeout * hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- sb.append("rpc timeout is " + this.rpcTimeout + " and after multiply retry times becomes " + this.timeout);
-
- this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
- sb.append(" after multiply kylin.query.cube.visit.timeout.times becomes " + this.timeout);
-
- logger.info(sb.toString());
-
- if (BackdoorToggles.getQueryTimeout() != -1) {
- this.timeout = BackdoorToggles.getQueryTimeout();
- logger.info("rpc timeout is overwritten to " + this.timeout);
- }
-
- this.timeoutTS = System.currentTimeMillis() + 2 * this.timeout;//longer timeout than coprocessor so that query thread will not timeout faster than coprocessor
+ this.coprocessorTimeout = coprocessorTimeout;
+ //longer timeout than coprocessor so that query thread will not timeout faster than coprocessor
+ this.deadline = System.currentTimeMillis() + coprocessorTimeout * 10;
}
@Override
@@ -84,22 +59,20 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
current++;
byte[] ret = null;
- while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) {
- ret = queue.poll(10000, TimeUnit.MILLISECONDS);
+ while (ret == null && coprocException == null && deadline > System.currentTimeMillis()) {
+ ret = queue.poll(1000, TimeUnit.MILLISECONDS);
}
if (coprocException != null) {
- if (coprocException instanceof GTScanSelfTerminatedException)
- throw (GTScanSelfTerminatedException) coprocException;
- else
- throw new RuntimeException("Error in coprocessor", coprocException);
+ throw Throwables.propagate(coprocException);
+ }
- } else if (ret == null) {
+ if (ret == null) {
throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + //
- GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + timeout + ") cannot support this many scans?");
- } else {
- return ret;
+ GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?");
}
+
+ return ret;
} catch (InterruptedException e) {
throw new RuntimeException("Error when waiting queue", e);
}
@@ -118,10 +91,6 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
}
}
- public long getRpcTimeout() {
- return this.timeout;
- }
-
public void notifyCoprocException(Throwable ex) {
coprocException = ex;
}