You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/09/09 09:48:11 UTC
[4/4] kylin git commit: KYLIN-2005 Move all storage side behavior
hints to GTScanRequest
KYLIN-2005 Move all storage side behavior hints to GTScanRequest
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a2c875d8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a2c875d8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a2c875d8
Branch: refs/heads/master
Commit: a2c875d8a2d06f23dd6467bbcc459bff82918295
Parents: e38557b
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Sep 9 16:46:22 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Sep 9 17:47:29 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/gridtable/GTScanRequest.java | 33 +-
.../kylin/gridtable/GTScanRequestBuilder.java | 30 +-
.../kylin/gridtable/StorageSideBehavior.java | 30 +
.../apache/kylin/query/ITKylinQueryTest.java | 4 +-
.../common/coprocessor/CoprocessorBehavior.java | 30 -
.../observer/AggregateRegionObserver.java | 10 +-
.../observer/AggregationScanner.java | 16 +-
.../coprocessor/observer/ObserverEnabler.java | 6 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 88 +--
.../hbase/cube/v2/ExpectedSizeIterator.java | 4 +-
.../coprocessor/endpoint/CubeVisitService.java | 18 +-
.../endpoint/generated/CubeVisitProtos.java | 754 ++++---------------
.../endpoint/protobuf/CubeVisit.proto | 13 +-
.../observer/AggregateRegionObserverTest.java | 6 +-
14 files changed, 332 insertions(+), 710 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 5d27028..3e57e86 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -59,6 +59,9 @@ public class GTScanRequest {
private String[] aggrMetricsFuncs;//
// hint to storage behavior
+ private String storageBehavior;
+ private long startTime;
+ private long timeout;
private boolean allowStorageAggregation;
private double aggCacheMemThreshold;
private int storageScanRowNumThreshold;
@@ -69,7 +72,7 @@ public class GTScanRequest {
GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, //
- double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit) {
+ double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit, String storageBehavior, long startTime, long timeout) {
this.info = info;
if (ranges == null) {
this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
@@ -83,6 +86,9 @@ public class GTScanRequest {
this.aggrMetrics = aggrMetrics;
this.aggrMetricsFuncs = aggrMetricsFuncs;
+ this.storageBehavior = storageBehavior;
+ this.startTime = startTime;
+ this.timeout = timeout;
this.allowStorageAggregation = allowStorageAggregation;
this.aggCacheMemThreshold = aggCacheMemThreshold;
this.storageScanRowNumThreshold = storageScanRowNumThreshold;
@@ -115,6 +121,10 @@ public class GTScanRequest {
}
}
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
private void validateFilterPushDown(GTInfo info) {
if (!hasFilterPushDown())
return;
@@ -280,6 +290,18 @@ public class GTScanRequest {
return this.storagePushDownLimit;
}
+ public String getStorageBehavior() {
+ return storageBehavior;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
@Override
public String toString() {
return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
@@ -320,6 +342,9 @@ public class GTScanRequest {
out.putDouble(value.aggCacheMemThreshold);
BytesUtil.writeVInt(value.storageScanRowNumThreshold, out);
BytesUtil.writeVInt(value.storagePushDownLimit, out);
+ BytesUtil.writeVLong(value.startTime, out);
+ BytesUtil.writeVLong(value.timeout, out);
+ BytesUtil.writeUTFString(value.storageBehavior, out);
}
@Override
@@ -350,11 +375,15 @@ public class GTScanRequest {
double sAggrCacheGB = in.getDouble();
int storageScanRowNumThreshold = BytesUtil.readVInt(in);
int storagePushDownLimit = BytesUtil.readVInt(in);
+ long startTime = BytesUtil.readVLong(in);
+ long timeout = BytesUtil.readVLong(in);
+ String storageBehavior = BytesUtil.readUTFString(in);
return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).//
setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).//
setFilterPushDown(sGTFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).//
- setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).createGTScanRequest();
+ setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).//
+ setStartTime(startTime).setTimeout(timeout).setStorageBehavior(storageBehavior).createGTScanRequest();
}
private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
index c4390cd..f542de1 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -21,6 +21,7 @@ package org.apache.kylin.gridtable;
import java.util.BitSet;
import java.util.List;
+import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.metadata.filter.TupleFilter;
@@ -36,6 +37,9 @@ public class GTScanRequestBuilder {
private double aggCacheMemThreshold = 0;
private int storageScanRowNumThreshold = Integer.MAX_VALUE;// storage should terminate itself when $storageScanRowNumThreshold cuboid rows are scanned, and throw exception.
private int storagePushDownLimit = Integer.MAX_VALUE;// storage can quit working when $toragePushDownLimit aggregated rows are produced.
+ private long startTime = -1;
+ private long timeout = -1;
+ private String storageBehavior = null;
public GTScanRequestBuilder setInfo(GTInfo info) {
this.info = info;
@@ -92,6 +96,21 @@ public class GTScanRequestBuilder {
return this;
}
+ public GTScanRequestBuilder setStartTime(long startTime) {
+ this.startTime = startTime;
+ return this;
+ }
+
+ public GTScanRequestBuilder setTimeout(long timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ public GTScanRequestBuilder setStorageBehavior(String storageBehavior) {
+ this.storageBehavior = storageBehavior;
+ return this;
+ }
+
public GTScanRequest createGTScanRequest() {
if (aggrGroupBy == null) {
aggrGroupBy = new ImmutableBitSet(new BitSet());
@@ -104,7 +123,14 @@ public class GTScanRequestBuilder {
if (aggrMetricsFuncs == null) {
aggrMetricsFuncs = new String[0];
}
-
- return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit);
+
+ if (storageBehavior == null) {
+ storageBehavior = BackdoorToggles.getCoprocessorBehavior() == null ? StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
+ }
+
+ this.startTime = startTime == -1 ? System.currentTimeMillis() : startTime;
+ this.timeout = timeout == -1 ? 300000 : timeout;
+
+ return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit, storageBehavior, startTime, timeout);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
new file mode 100644
index 0000000..7fa93e7
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+/**
+ */
+public enum StorageSideBehavior {
+ RAW_SCAN, //on use RegionScanner to scan raw data, for testing hbase scan speed
+ SCAN, //only scan data, used for profiling tuple scan speed. Will not return any result
+ SCAN_FILTER, //only scan+filter used,used for profiling filter speed. Will not return any result
+ SCAN_FILTER_AGGR, //aggregate the result. Will return results
+ SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results
+ SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY, // on each scan operation, delay for 10s to simulate slow queries, for test use
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index fc2fd52..0efea64 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -40,7 +40,7 @@ import org.apache.kylin.query.routing.Candidate;
import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
import org.apache.kylin.query.schema.OLAPSchemaFactory;
import org.apache.kylin.storage.hbase.HBaseStorage;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
+import org.apache.kylin.gridtable.StorageSideBehavior;
import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
import org.dbunit.database.DatabaseConnection;
import org.dbunit.database.IDatabaseConnection;
@@ -140,7 +140,7 @@ public class ITKylinQueryTest extends KylinTestBase {
});
Map<String, String> toggles = Maps.newHashMap();
- toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan
+ toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan
BackdoorToggles.setToggles(toggles);
KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.03");//set timeout to 9s
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
deleted file mode 100644
index 5f21351..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.common.coprocessor;
-
-/**
- */
-public enum CoprocessorBehavior {
- RAW_SCAN, //on use RegionScanner to scan raw data, for testing hbase scan speed
- SCAN, //only scan data, used for profiling tuple scan speed. Will not return any result
- SCAN_FILTER, //only scan+filter used,used for profiling filter speed. Will not return any result
- SCAN_FILTER_AGGR, //aggregate the result. Will return results
- SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results
- SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY, // on each scan operation, delay for 10s to simulate slow queries, for test use
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
index c7b650a..7139ca7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
+import org.apache.kylin.gridtable.StorageSideBehavior;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -85,15 +85,15 @@ public class AggregateRegionObserver extends BaseRegionObserver {
byte[] filterBytes = scan.getAttribute(FILTER);
CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
- CoprocessorBehavior coprocessorBehavior = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM;
+ StorageSideBehavior storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM;
try {
byte[] behavior = scan.getAttribute(BEHAVIOR);
if (behavior != null && behavior.length != 0) {
- coprocessorBehavior = CoprocessorBehavior.valueOf(new String(behavior));
+ storageSideBehavior = StorageSideBehavior.valueOf(new String(behavior));
}
} catch (Exception e) {
LOG.error("failed to parse behavior,using default behavior SCAN_FILTER_AGGR_CHECKMEM", e);
- coprocessorBehavior = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM;
+ storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM;
}
// start/end region operation & sync on scanner is suggested by the
@@ -103,7 +103,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
region.startRegionOperation();
try {
synchronized (innerScanner) {
- return new AggregationScanner(type, filter, projector, aggregators, innerScanner, coprocessorBehavior);
+ return new AggregationScanner(type, filter, projector, aggregators, innerScanner, storageSideBehavior);
}
} finally {
region.closeRegionOperation();
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
index be26142..a77f988 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
+import org.apache.kylin.gridtable.StorageSideBehavior;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -39,9 +39,9 @@ import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
public class AggregationScanner implements RegionScanner {
private RegionScanner outerScanner;
- private CoprocessorBehavior behavior;
+ private StorageSideBehavior behavior;
- public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, CoprocessorBehavior behavior) throws IOException {
+ public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, StorageSideBehavior behavior) throws IOException {
AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
@@ -79,23 +79,23 @@ public class AggregationScanner implements RegionScanner {
Cell cell = results.get(0);
tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- if (behavior == CoprocessorBehavior.SCAN) {
+ if (behavior == StorageSideBehavior.SCAN) {
//touch every byte of the cell so that the cost of scanning will be trully reflected
int endIndex = cell.getRowOffset() + cell.getRowLength();
for (int i = cell.getRowOffset(); i < endIndex; ++i) {
meaninglessByte += cell.getRowArray()[i];
}
} else {
- if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal()) {
+ if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER.ordinal()) {
if (filter != null && filter.evaluate(tuple) == false)
continue;
- if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()) {
+ if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR.ordinal()) {
AggrKey aggKey = projector.getAggrKey(results);
MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
aggregators.aggregate(bufs, results);
- if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+ if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
aggCache.checkMemoryUsage();
}
}
@@ -103,7 +103,7 @@ public class AggregationScanner implements RegionScanner {
}
}
- if (behavior == CoprocessorBehavior.SCAN) {
+ if (behavior == StorageSideBehavior.SCAN) {
System.out.println("meaningless byte is now " + meaninglessByte);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
index f0e9bed..394b3e2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
@@ -35,7 +35,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
+import org.apache.kylin.gridtable.StorageSideBehavior;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -75,14 +75,14 @@ public class ObserverEnabler {
if (localCoprocessor) {
RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
- AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM);
+ AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM);
return new ResultScannerAdapter(aggrScanner);
} else {
// debug/profiling purpose
String toggle = BackdoorToggles.getCoprocessorBehavior();
if (toggle == null) {
- toggle = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior
+ toggle = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior
} else {
logger.info("The execution of this query will use " + toggle + " as observer's behavior");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 5b48351..573951b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
@@ -47,7 +46,6 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse;
@@ -104,10 +102,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@Override
public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
- final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
-
- logger.info("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle);
-
Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard();
short shardNum = shardNumAndBaseShard.getFirst();
short cuboidBaseShard = shardNumAndBaseShard.getSecond();
@@ -130,39 +124,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
//TODO: raw scan can be constructed at region side to reduce traffic
List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
- int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
- while (true) {
- try {
- ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
- BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
- for (RawScan rs : rawScans) {
- RawScan.serializer.serialize(rs, rawScanBuffer);
- }
- rawScanBuffer.flip();
- rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
- break;
- } catch (BufferOverflowException boe) {
- logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
- rawScanBufferSize *= 4;
- }
- }
+ rawScanByteString = serializeRawScans(rawScans);
+
scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
-
- int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
- while (true) {
- try {
- ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
- GTScanRequest.serializer.serialize(scanRequest, buffer);
- buffer.flip();
- scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
- break;
- } catch (BufferOverflowException boe) {
- logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
- scanRequestBufferSize *= 4;
- }
- }
-
- logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
+ final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
+ scanRequest.setTimeout(epResultItr.getRpcTimeout());
+ scanRequestByteString = serializeGTScanReq(scanRequest);
+
+ logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size());
for (RawScan rs : rawScans) {
@@ -172,7 +141,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, cuboidBaseShard, rawScans.size());
final AtomicLong totalScannedCount = new AtomicLong(0);
- final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
// KylinConfig: use env instance instead of CubeSegment, because KylinConfig will share among queries
// for different cubes until redeployment of coprocessor jar.
@@ -184,9 +152,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
builder.addHbaseColumnsToGT(intList);
}
builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
- builder.setBehavior(toggle);
- builder.setStartTime(System.currentTimeMillis());
- builder.setTimeout(epResultItr.getRpcTimeout());
builder.setKylinProperties(kylinConfig.getConfigAsString());
for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
@@ -260,6 +225,45 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
return new GTBlobScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit());
}
+ private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
+ ByteString scanRequestByteString;
+ int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+ while (true) {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
+ GTScanRequest.serializer.serialize(scanRequest, buffer);
+ buffer.flip();
+ scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
+ break;
+ } catch (BufferOverflowException boe) {
+ logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
+ scanRequestBufferSize *= 4;
+ }
+ }
+ return scanRequestByteString;
+ }
+
+ private ByteString serializeRawScans(List<RawScan> rawScans) {
+ ByteString rawScanByteString;
+ int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+ while (true) {
+ try {
+ ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
+ BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
+ for (RawScan rs : rawScans) {
+ RawScan.serializer.serialize(rs, rawScanBuffer);
+ }
+ rawScanBuffer.flip();
+ rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
+ break;
+ } catch (BufferOverflowException boe) {
+ logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
+ rawScanBufferSize *= 4;
+ }
+ }
+ return rawScanByteString;
+ }
+
private String getStatsString(byte[] region, CubeVisitResponse result) {
StringBuilder sb = new StringBuilder();
Stats stats = result.getStats();
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index 442963f..f4729a3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -92,8 +92,8 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
if (coprocException instanceof GTScanSelfTerminatedException)
throw (GTScanSelfTerminatedException) coprocException;
else
- throw new RuntimeException("Error in coprocessor",coprocException);
-
+ throw new RuntimeException("Error in coprocessor", coprocException);
+
} else if (ret == null) {
throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + //
GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + timeout + ") cannot support this many scans?");
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 064d100..36adca1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -54,11 +54,11 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanTimeoutException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.gridtable.StorageSideBehavior;
import org.apache.kylin.measure.BufferedMeasureEncoder;
import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
import org.apache.kylin.storage.hbase.cube.v2.CellListIterator;
import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
import org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore;
@@ -198,10 +198,10 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
for (IntList intList : request.getHbaseColumnsToGTList()) {
hbaseColumnsToGT.add(intList.getIntsList());
}
- CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
+ StorageSideBehavior behavior = StorageSideBehavior.valueOf(scanReq.getStorageBehavior());
final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
- appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - request.getStartTime()));
+ appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - scanReq.getStartTime()));
MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
@Override
@@ -228,7 +228,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator());
- if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) {
+ if (behavior.ordinal() < StorageSideBehavior.SCAN.ordinal()) {
//this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed
List<Cell> temp = Lists.newArrayList();
int counter = 0;
@@ -240,12 +240,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
appendProfileInfo(sb, "scanned " + counter);
}
- if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+ if (behavior.ordinal() < StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
scanReq.disableAggCacheMemCheck(); // disable mem check if so told
}
final MutableBoolean scanNormalComplete = new MutableBoolean(true);
- final long deadline = request.getTimeout() + this.serviceStartTime;
+ final long deadline = scanReq.getTimeout() + this.serviceStartTime;
final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
final CellListIterator cellListIterator = new CellListIterator() {
@@ -285,12 +285,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
};
IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, //
- request.getRowkeyPreambleSize(), CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString().equals(request.getBehavior()));
+ request.getRowkeyPreambleSize(), StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString().equals(scanReq.getStorageBehavior()));
IGTScanner rawScanner = store.scan(scanReq);
IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
- behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), //
- behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(), deadline);
+ behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER.ordinal(), //
+ behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR.ordinal(), deadline);
ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);