You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/09/09 09:48:09 UTC
[2/4] kylin git commit: KYLIN-1922 optimize needStorageAggregation
check logic and make sure self-termination in coprocessor works
KYLIN-1922 optimize needStorageAggregation check logic and make sure self-termination in coprocessor works
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e38557b4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e38557b4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e38557b4
Branch: refs/heads/master
Commit: e38557b4d1cd1d42fe042e5500020cbfaba2d80b
Parents: e87c816
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Sep 9 15:57:25 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Sep 9 16:42:33 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 15 +-
.../apache/kylin/cube/RawQueryLastHacker.java | 7 +-
.../cube/gridtable/CubeScanRangePlanner.java | 340 ----------
.../kylin/gridtable/GTAggregateScanner.java | 10 +-
.../apache/kylin/gridtable/GTFilterScanner.java | 6 +-
.../GTScanExceedThresholdException.java | 2 +-
.../apache/kylin/gridtable/GTScanRequest.java | 34 +-
.../GTScanSelfTerminatedException.java | 26 +
.../kylin/gridtable/GTScanTimeoutException.java | 2 +-
.../gridtable/AggregationCacheSpillTest.java | 6 +-
.../kylin/gridtable/DictGridTableTest.java | 617 ------------------
.../storage/gtrecord/CubeScanRangePlanner.java | 357 +++++++++++
.../storage/gtrecord/CubeSegmentScanner.java | 14 +-
.../gtrecord/GTCubeStorageQueryBase.java | 36 +-
.../storage/gtrecord/DictGridTableTest.java | 626 +++++++++++++++++++
.../apache/kylin/query/ITKylinQueryTest.java | 55 +-
.../resources/query/sql_timeout/query01.sql | 19 +
.../common/coprocessor/CoprocessorBehavior.java | 1 +
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 8 +-
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +-
.../hbase/cube/v2/ExpectedSizeIterator.java | 39 +-
.../hbase/cube/v2/HBaseReadonlyStore.java | 11 +-
.../coprocessor/endpoint/CubeVisitService.java | 26 +-
23 files changed, 1197 insertions(+), 1062 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f0c91da..2ac9d48 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -481,8 +481,8 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
}
- public int getCubeVisitTimeoutTimes() {
- return Integer.parseInt(getOptional("kylin.query.cube.visit.timeout.times", "1"));
+ public float getCubeVisitTimeoutTimes() {
+ return Float.parseFloat(getOptional("kylin.query.cube.visit.timeout.times", "1"));
}
public int getBadQueryStackTraceDepth() {
@@ -545,15 +545,6 @@ abstract public class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(this.getOptional("kylin.query.ignore_unknown_function", "false"));
}
- public String getQueryStorageVisitPlanner() {
- return this.getOptional("kylin.query.storage.visit.planner", "org.apache.kylin.cube.gridtable.CubeScanRangePlanner");
- }
-
- // for test only
- public void setQueryStorageVisitPlanner(String v) {
- setProperty("kylin.query.storage.visit.planner", v);
- }
-
public int getQueryScanFuzzyKeyMax() {
return Integer.parseInt(this.getOptional("kylin.query.scan.fuzzykey.max", "200"));
}
@@ -573,7 +564,7 @@ abstract public class KylinConfigBase implements Serializable {
public boolean getQueryMetricsEnabled() {
return Boolean.parseBoolean(getOptional("kylin.query.metrics.enabled", "false"));
}
-
+
public int[] getQueryMetricsPercentilesIntervals() {
String[] dft = { "60", "300", "3600" };
return getOptionalIntArray("kylin.query.metrics.percentiles.intervals", dft);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java b/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java
index 63ddac5..50c644e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/RawQueryLastHacker.java
@@ -44,13 +44,14 @@ public class RawQueryLastHacker {
// We need to retrieve cube to manually add columns into sqlDigest, so that we have full-columns results as output.
boolean isSelectAll = sqlDigest.allColumns.isEmpty() || sqlDigest.allColumns.equals(sqlDigest.filterColumns);
for (TblColRef col : cubeDesc.listAllColumns()) {
- if (col.getTable().equals(sqlDigest.factTable) && (cubeDesc.listDimensionColumnsIncludingDerived().contains(col) || isSelectAll)) {
- sqlDigest.allColumns.add(col);
+ if (cubeDesc.listDimensionColumnsIncludingDerived().contains(col) || isSelectAll) {
+ if (col.getTable().equals(sqlDigest.factTable))
+ sqlDigest.allColumns.add(col);
}
}
for (TblColRef col : sqlDigest.allColumns) {
- if (cubeDesc.listDimensionColumnsIncludingDerived().contains(col)) {
+ if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
// For dimension columns, take them as group by columns.
sqlDigest.groupbyColumns.add(col);
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeScanRangePlanner.java
deleted file mode 100644
index a937045..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeScanRangePlanner.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cube.gridtable;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.FuzzyValueCombination;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRange;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTScanRequestBuilder;
-import org.apache.kylin.gridtable.GTUtil;
-import org.apache.kylin.gridtable.IGTComparator;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class CubeScanRangePlanner extends ScanRangePlannerBase {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeScanRangePlanner.class);
-
- protected int maxScanRanges;
- protected int maxFuzzyKeys;
-
- //non-GT
- protected CubeSegment cubeSegment;
- protected CubeDesc cubeDesc;
- protected Cuboid cuboid;
-
- public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, Set<TblColRef> groupbyDims, //
- Collection<FunctionDesc> metrics) {
-
- this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
- this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
-
- this.cubeSegment = cubeSegment;
- this.cubeDesc = cubeSegment.getCubeDesc();
- this.cuboid = cuboid;
-
- Set<TblColRef> filterDims = Sets.newHashSet();
- TupleFilter.collectColumns(filter, filterDims);
-
- this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId());
- CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-
- IGTComparator comp = gtInfo.getCodeSystem().getComparator();
- //start key GTRecord compare to start key GTRecord
- this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp);
- //stop key GTRecord compare to stop key GTRecord
- this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp);
- //start key GTRecord compare to stop key GTRecord
- this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
-
- //replace the constant values in filter to dictionary codes
- this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getCuboidDimensionsInGTOrder(), groupbyDims);
-
- this.gtDimensions = mapping.makeGridTableColumns(dimensions);
- this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupbyDims, cubeSegment.getCubeDesc()));
- this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
- this.gtAggrFuncs = mapping.makeAggrFuncs(metrics);
-
- if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) {
- int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef());
- if (index >= 0) {
- SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo);
- this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index);
- this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index);
- this.gtPartitionCol = gtInfo.colRef(index);
- }
- }
-
- }
-
- /**
- * constrcut GTScanRangePlanner with incomplete information. only be used for UT
- * @param info
- * @param gtStartAndEnd
- * @param gtPartitionCol
- * @param gtFilter
- */
- public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
-
- this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
- this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
-
- this.gtInfo = info;
-
- IGTComparator comp = gtInfo.getCodeSystem().getComparator();
- //start key GTRecord compare to start key GTRecord
- this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp);
- //stop key GTRecord compare to stop key GTRecord
- this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp);
- //start key GTRecord compare to stop key GTRecord
- this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
-
- this.gtFilter = gtFilter;
- this.gtStartAndEnd = gtStartAndEnd;
- this.gtPartitionCol = gtPartitionCol;
- }
-
- public GTScanRequest planScanRequest() {
- GTScanRequest scanRequest;
- List<GTScanRange> scanRanges = this.planScanRanges();
- if (scanRanges != null && scanRanges.size() != 0) {
- scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).createGTScanRequest();
- } else {
- scanRequest = null;
- }
- return scanRequest;
- }
-
- /**
- * Overwrite this method to provide smarter storage visit plans
- * @return
- */
- public List<GTScanRange> planScanRanges() {
- TupleFilter flatFilter = flattenToOrAndFilter(gtFilter);
-
- List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
-
- List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
- for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
- GTScanRange scanRange = newScanRange(andDimRanges);
- if (scanRange != null)
- scanRanges.add(scanRange);
- }
-
- List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
- mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges);
-
- return mergedRanges;
- }
-
- private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
- Set<TblColRef> ret = Sets.newHashSet();
- for (TblColRef col : input) {
- if (cubeDesc.hasHostColumn(col)) {
- for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
- ret.add(host);
- }
- } else {
- ret.add(col);
- }
- }
- return ret;
- }
-
- protected GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
- GTRecord pkStart = new GTRecord(gtInfo);
- GTRecord pkEnd = new GTRecord(gtInfo);
- Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
-
- List<GTRecord> fuzzyKeys;
-
- for (ColumnRange range : andDimRanges) {
- if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
- int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond());
- int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end);
-
- if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) {
- //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition.
- } else {
- logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", //
- gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end));
- return null;
- }
- }
-
- int col = range.column.getColumnDesc().getZeroBasedIndex();
- if (!gtInfo.getPrimaryKey().get(col))
- continue;
-
- pkStart.set(col, range.begin);
- pkEnd.set(col, range.end);
-
- if (range.valueSet != null && !range.valueSet.isEmpty()) {
- fuzzyValues.put(col, range.valueSet);
- }
- }
-
- fuzzyKeys =
-
- buildFuzzyKeys(fuzzyValues);
- return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
- }
-
- private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) {
- ArrayList<GTRecord> result = Lists.newArrayList();
-
- if (fuzzyValueSet.isEmpty())
- return result;
-
- // debug/profiling purpose
- if (BackdoorToggles.getDisableFuzzyKey()) {
- logger.info("The execution of this query will not use fuzzy key");
- return result;
- }
-
- List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, maxFuzzyKeys);
-
- for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
-
- // BitSet bitSet = new BitSet(gtInfo.getColumnCount());
- // for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
- // bitSet.set(entry.getKey());
- // }
- GTRecord fuzzy = new GTRecord(gtInfo);
- for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
- fuzzy.set(entry.getKey(), entry.getValue());
- }
-
- result.add(fuzzy);
- }
- return result;
- }
-
- protected List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
- if (ranges.size() <= 1) {
- return ranges;
- }
-
- // sort ranges by start key
- Collections.sort(ranges, new Comparator<GTScanRange>() {
- @Override
- public int compare(GTScanRange a, GTScanRange b) {
- return rangeStartComparator.compare(a.pkStart, b.pkStart);
- }
- });
-
- // merge the overlap range
- List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>();
- int mergeBeginIndex = 0;
- GTRecord mergeEnd = ranges.get(0).pkEnd;
- for (int index = 1; index < ranges.size(); index++) {
- GTScanRange range = ranges.get(index);
-
- // if overlap, swallow it
- if (rangeStartEndComparator.compare(range.pkStart, mergeEnd) <= 0) {
- mergeEnd = rangeEndComparator.max(mergeEnd, range.pkEnd);
- continue;
- }
-
- // not overlap, split here
- GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, index));
- mergedRanges.add(mergedRange);
-
- // start new split
- mergeBeginIndex = index;
- mergeEnd = range.pkEnd;
- }
-
- // don't miss the last range
- GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, ranges.size()));
- mergedRanges.add(mergedRange);
-
- return mergedRanges;
- }
-
- private GTScanRange mergeKeyRange(List<GTScanRange> ranges) {
- GTScanRange first = ranges.get(0);
- if (ranges.size() == 1)
- return first;
-
- GTRecord start = first.pkStart;
- GTRecord end = first.pkEnd;
- List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>();
-
- boolean hasNonFuzzyRange = false;
- for (GTScanRange range : ranges) {
- hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty();
- newFuzzyKeys.addAll(range.fuzzyKeys);
- end = rangeEndComparator.max(end, range.pkEnd);
- }
-
- // if any range is non-fuzzy, then all fuzzy keys must be cleared
- // also too many fuzzy keys will slow down HBase scan
- if (hasNonFuzzyRange || newFuzzyKeys.size() > maxFuzzyKeys) {
- newFuzzyKeys.clear();
- }
-
- return new GTScanRange(start, end, newFuzzyKeys);
- }
-
- protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
- if (ranges.size() <= maxRanges) {
- return ranges;
- }
-
- // TODO: check the distance between range and merge the large distance range
- List<GTScanRange> result = new ArrayList<GTScanRange>(1);
- GTScanRange mergedRange = mergeKeyRange(ranges);
- result.add(mergedRange);
- return result;
- }
-
- public int getMaxScanRanges() {
- return maxScanRanges;
- }
-
- public void setMaxScanRanges(int maxScanRanges) {
- this.maxScanRanges = maxScanRanges;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index ccf4895..db38484 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -138,7 +138,10 @@ public class GTAggregateScanner implements IGTScanner {
long count = 0;
for (GTRecord r : inputScanner) {
- count++;
+ //check deadline
+ if (count % GTScanRequest.terminateCheckInterval == 1 && System.currentTimeMillis() > deadline) {
+ throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count);
+ }
if (getNumOfSpills() == 0) {
//check limit
@@ -152,10 +155,7 @@ public class GTAggregateScanner implements IGTScanner {
aggrCache.aggregate(r, Integer.MAX_VALUE);
}
- //check deadline
- if (count % 10000 == 1 && System.currentTimeMillis() > deadline) {
- throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count);
- }
+ count++;
}
logger.info("GTAggregateScanner input rows: " + count);
return aggrCache.iterator();
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
index 31a9599..f1f84af 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
@@ -132,12 +132,12 @@ public class GTFilterScanner implements IGTScanner {
}
// cache the last one input and result, can reuse because rowkey are ordered, and same input could come in small group
- static class FilterResultCache {
+ public static class FilterResultCache {
static final int CHECKPOINT = 10000;
static final double HIT_RATE_THRESHOLD = 0.5;
- static boolean ENABLED = true; // enable cache by default
+ public static boolean ENABLED = true; // enable cache by default
- boolean enabled = ENABLED;
+ public boolean enabled = ENABLED;
ImmutableBitSet colsInFilter;
int count;
int hit;
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
index dd57e90..ba75962 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
@@ -18,7 +18,7 @@
package org.apache.kylin.gridtable;
-public class GTScanExceedThresholdException extends RuntimeException {
+public class GTScanExceedThresholdException extends GTScanSelfTerminatedException {
public GTScanExceedThresholdException(String message) {
super(message);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 4cfba1b..5d27028 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -42,6 +42,8 @@ import com.google.common.collect.Sets;
public class GTScanRequest {
private static final Logger logger = LoggerFactory.getLogger(GTScanRequest.class);
+ //it's not necessary to increase the checkInterval to very large because the check cost is not high
+ public static final int terminateCheckInterval = 1000;
private GTInfo info;
private List<GTScanRange> ranges;
@@ -55,13 +57,16 @@ public class GTScanRequest {
private ImmutableBitSet aggrGroupBy;
private ImmutableBitSet aggrMetrics;
private String[] aggrMetricsFuncs;//
-
+
// hint to storage behavior
private boolean allowStorageAggregation;
private double aggCacheMemThreshold;
private int storageScanRowNumThreshold;
private int storagePushDownLimit;
+ // runtime computed fields
+ private transient boolean doingStorageAggregation = false;
+
GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, //
double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit) {
@@ -169,6 +174,7 @@ public class GTScanRequest {
logger.info("pre aggregation is not beneficial, skip it");
} else if (this.hasAggregation()) {
logger.info("pre aggregating results before returning");
+ this.doingStorageAggregation = true;
result = new GTAggregateScanner(result, this, deadline);
} else {
logger.info("has no aggregation, skip it");
@@ -178,6 +184,10 @@ public class GTScanRequest {
}
+ public boolean isDoingStorageAggregation() {
+ return doingStorageAggregation;
+ }
+
//touch every byte of the cell so that the cost of scanning will be truly reflected
private int lookAndForget(IGTScanner scanner) {
byte meaninglessByte = 0;
@@ -215,8 +225,8 @@ public class GTScanRequest {
return ranges;
}
- public void setGTScanRanges(List<GTScanRange> ranges) {
- this.ranges = ranges;
+ public void clearScanRanges() {
+ this.ranges = Lists.newArrayList();
}
public ImmutableBitSet getSelectedColBlocks() {
@@ -251,10 +261,6 @@ public class GTScanRequest {
return allowStorageAggregation;
}
- public void setAllowStorageAggregation(boolean allowStorageAggregation) {
- this.allowStorageAggregation = allowStorageAggregation;
- }
-
public double getAggCacheMemThreshold() {
if (aggCacheMemThreshold < 0)
return 0;
@@ -262,28 +268,18 @@ public class GTScanRequest {
return aggCacheMemThreshold;
}
- public void setAggCacheMemThreshold(double gb) {
- this.aggCacheMemThreshold = gb;
+ public void disableAggCacheMemCheck() {
+ this.aggCacheMemThreshold = 0;
}
public int getStorageScanRowNumThreshold() {
return storageScanRowNumThreshold;
}
- public void setStorageScanRowNumThreshold(int storageScanRowNumThreshold) {
- logger.info("storageScanRowNumThreshold is set to " + storageScanRowNumThreshold);
- this.storageScanRowNumThreshold = storageScanRowNumThreshold;
- }
-
public int getStoragePushDownLimit() {
return this.storagePushDownLimit;
}
- public void setStoragePushDownLimit(int limit) {
- logger.info("storagePushDownLimit is set to " + storagePushDownLimit);
- this.storagePushDownLimit = limit;
- }
-
@Override
public String toString() {
return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java
new file mode 100644
index 0000000..4775ac6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public class GTScanSelfTerminatedException extends RuntimeException {
+
+ public GTScanSelfTerminatedException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
index e92dae3..17a8d02 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
@@ -18,7 +18,7 @@
package org.apache.kylin.gridtable;
-public class GTScanTimeoutException extends RuntimeException {
+public class GTScanTimeoutException extends GTScanSelfTerminatedException {
public GTScanTimeoutException(String message) {
super(message);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
index b5f6de7..38b8c90 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -84,8 +84,7 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
}
};
- GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(0, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
- scanRequest.setAggCacheMemThreshold(0.5);
+ GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(0, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).setAggCacheMemThreshold(0.5).createGTScanRequest();
GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
@@ -127,8 +126,7 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
};
// all-in-mem testcase
- GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(1, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
- scanRequest.setAggCacheMemThreshold(0.5);
+ GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(1, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).setAggCacheMemThreshold(0.5).createGTScanRequest();
GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
deleted file mode 100644
index 7b6d3fa..0000000
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ /dev/null
@@ -1,617 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.gridtable;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.gridtable.CubeCodeSystem;
-import org.apache.kylin.cube.gridtable.CubeScanRangePlanner;
-import org.apache.kylin.dict.NumberDictionaryBuilder;
-import org.apache.kylin.dict.StringBytesConverter;
-import org.apache.kylin.dict.TrieDictionaryBuilder;
-import org.apache.kylin.dimension.DictionaryDimEnc;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.gridtable.GTFilterScanner.FilterResultCache;
-import org.apache.kylin.gridtable.GTInfo.Builder;
-import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.datatype.LongMutable;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.ExtractTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class DictGridTableTest extends LocalFileMetadataTestCase {
-
- private GridTable table;
- private GTInfo info;
- private CompareTupleFilter timeComp0;
- private CompareTupleFilter timeComp1;
- private CompareTupleFilter timeComp2;
- private CompareTupleFilter timeComp3;
- private CompareTupleFilter timeComp4;
- private CompareTupleFilter timeComp5;
- private CompareTupleFilter timeComp6;
- private CompareTupleFilter ageComp1;
- private CompareTupleFilter ageComp2;
- private CompareTupleFilter ageComp3;
- private CompareTupleFilter ageComp4;
-
- @After
- public void after() throws Exception {
-
- this.cleanupTestMetadata();
- }
-
- @Before
- public void setup() throws IOException {
-
- this.createTestMetadata();
-
- table = newTestTable();
- info = table.getInfo();
-
- timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
- timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
- timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
- timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
- timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
- timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
- timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
- ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
- ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
- ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
- ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
-
- }
-
- @Test
- public void verifySegmentSkipping() {
-
- ByteArray segmentStart = enc(info, 0, "2015-01-14");
- ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
- ByteArray segmentEnd = enc(info, 0, "2015-01-15");
- assertEquals(segmentStart, segmentStartX);
-
- {
- LogicalTupleFilter filter = and(timeComp0, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());//scan range are [close,close]
- assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
- assertEquals(1, r.get(0).fuzzyKeys.size());
- assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
- }
- {
- LogicalTupleFilter filter = and(timeComp2, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
- }
- {
- LogicalTupleFilter filter = and(timeComp4, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
- }
- {
- LogicalTupleFilter filter = and(timeComp5, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
- }
- {
- LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());
- assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
- assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
- }
- {
- LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());
- assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
- assertEquals(0, r.get(0).fuzzyKeys.size());
- }
- {
- //skip FALSE filter
- LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
- }
- {
- //TRUE or FALSE filter
- LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());
- assertEquals("[null, null]-[null, null]", r.get(0).toString());
- }
- {
- //TRUE or other filter
- LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());
- assertEquals("[null, null]-[null, null]", r.get(0).toString());
- }
- }
-
- @Test
- public void verifySegmentSkipping2() {
- ByteArray segmentEnd = enc(info, 0, "2015-01-15");
-
- {
- LogicalTupleFilter filter = and(timeComp0, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());//scan range are [close,close]
- assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
- assertEquals(1, r.get(0).fuzzyKeys.size());
- assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
- }
-
- {
- LogicalTupleFilter filter = and(timeComp5, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());//scan range are [close,close]
- }
- }
-
- @Test
- public void verifyScanRangePlanner() {
-
- // flatten or-and & hbase fuzzy value
- {
- LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());
- assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
- assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
- }
-
- // pre-evaluate ever false
- {
- LogicalTupleFilter filter = and(timeComp1, timeComp2);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
- }
-
- // pre-evaluate ever true
- {
- LogicalTupleFilter filter = or(timeComp1, ageComp4);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals("[[null, null]-[null, null]]", r.toString());
- }
-
- // merge overlap range
- {
- LogicalTupleFilter filter = or(timeComp1, timeComp3);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals("[[null, null]-[null, null]]", r.toString());
- }
-
- // merge too many ranges
- {
- LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
- List<GTScanRange> r = planner.planScanRanges();
- assertEquals(3, r.size());
- assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
- assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
- assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
- planner.setMaxScanRanges(2);
- List<GTScanRange> r2 = planner.planScanRanges();
- assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
- }
- }
-
- @Test
- public void verifyFirstRow() throws IOException {
- doScanAndVerify(table, new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(), "[1421193600000, 30, Yang, 10, 10.5]", //
- "[1421193600000, 30, Luke, 10, 10.5]", //
- "[1421280000000, 20, Dong, 10, 10.5]", //
- "[1421280000000, 20, Jason, 10, 10.5]", //
- "[1421280000000, 30, Xu, 10, 10.5]", //
- "[1421366400000, 20, Mahone, 10, 10.5]", //
- "[1421366400000, 20, Qianhao, 10, 10.5]", //
- "[1421366400000, 30, George, 10, 10.5]", //
- "[1421366400000, 30, Shaofeng, 10, 10.5]", //
- "[1421452800000, 10, Kejia, 10, 10.5]");
- }
-
- //for testing GTScanRequest serialization and deserialization
- public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
- ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
- GTScanRequest.serializer.serialize(origin, buffer);
- buffer.flip();
- GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
-
- Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
- Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
- return sGTScanRequest;
- }
-
- @Test
- public void verifyScanWithUnevaluatableFilter() throws IOException {
- GTInfo info = table.getInfo();
-
- CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
- ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
- LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
- LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
-
- GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
-
- // note the unEvaluatable column 1 in filter is added to group by
- assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
-
- doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
- }
-
- @Test
- public void verifyScanWithEvaluatableFilter() throws IOException {
- GTInfo info = table.getInfo();
-
- CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
- CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
- LogicalTupleFilter filter = and(fComp1, fComp2);
-
- GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
- // note the evaluatable column 1 in filter is added to returned columns but not in group by
- assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
-
- doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
- }
-
- @Test
- public void testFilterScannerPerf() throws IOException {
- GridTable table = newTestPerfTable();
- GTInfo info = table.getInfo();
-
- CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
- CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
- LogicalTupleFilter filter = and(fComp1, fComp2);
-
- FilterResultCache.ENABLED = false;
- testFilterScannerPerfInner(table, info, filter);
- FilterResultCache.ENABLED = true;
- testFilterScannerPerfInner(table, info, filter);
- FilterResultCache.ENABLED = false;
- testFilterScannerPerfInner(table, info, filter);
- FilterResultCache.ENABLED = true;
- testFilterScannerPerfInner(table, info, filter);
- }
-
- @SuppressWarnings("unused")
- private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) throws IOException {
- long start = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(filter).createGTScanRequest();
- IGTScanner scanner = table.scan(req);
- int i = 0;
- for (GTRecord r : scanner) {
- i++;
- }
- scanner.close();
- long end = System.currentTimeMillis();
- System.out.println((end - start) + "ms with filter cache enabled=" + FilterResultCache.ENABLED + ", " + i + " rows");
- }
-
- @Test
- public void verifyConvertFilterConstants1() {
- GTInfo info = table.getInfo();
-
- TableDesc extTable = TableDesc.mockup("ext");
- TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef();
- TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef();
-
- CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
- CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
- LogicalTupleFilter filter = and(fComp1, fComp2);
-
- List<TblColRef> colMapping = Lists.newArrayList();
- colMapping.add(extColA);
- colMapping.add(extColB);
-
- TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
- assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString());
- }
-
- @Test
- public void verifyConvertFilterConstants2() {
- GTInfo info = table.getInfo();
-
- TableDesc extTable = TableDesc.mockup("ext");
- TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef();
- TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef();
-
- CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
- CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LT, "9");
- LogicalTupleFilter filter = and(fComp1, fComp2);
-
- List<TblColRef> colMapping = Lists.newArrayList();
- colMapping.add(extColA);
- colMapping.add(extColB);
-
- // $1<"9" round up to $1<"10"
- TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
- assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString());
- }
-
- @Test
- public void verifyConvertFilterConstants3() {
- GTInfo info = table.getInfo();
-
- TableDesc extTable = TableDesc.mockup("ext");
- TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef();
- TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef();
-
- CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
- CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LTE, "9");
- LogicalTupleFilter filter = and(fComp1, fComp2);
-
- List<TblColRef> colMapping = Lists.newArrayList();
- colMapping.add(extColA);
- colMapping.add(extColB);
-
- // $1<="9" round down to FALSE
- TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
- assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString());
- }
-
- @Test
- public void verifyConvertFilterConstants4() {
- GTInfo info = table.getInfo();
-
- TableDesc extTable = TableDesc.mockup("ext");
- TblColRef extColA = ColumnDesc.mockup(extTable, 1, "A", "timestamp").getRef();
- TblColRef extColB = ColumnDesc.mockup(extTable, 2, "B", "integer").getRef();
-
- CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
- CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
- LogicalTupleFilter filter = and(fComp1, fComp2);
-
- List<TblColRef> colMapping = Lists.newArrayList();
- colMapping.add(extColA);
- colMapping.add(extColB);
-
- // $1 in ("9", "10", "15") has only "10" left
- TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
- assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString());
- }
-
- private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
- System.out.println(req);
- IGTScanner scanner = table.scan(req);
- int i = 0;
- for (GTRecord r : scanner) {
- System.out.println(r);
- if (verifyRows == null || i >= verifyRows.length) {
- Assert.fail();
- }
- assertEquals(verifyRows[i], r.toString());
- i++;
- }
- scanner.close();
- }
-
- public static ByteArray enc(GTInfo info, int col, String value) {
- ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
- info.codeSystem.encodeColumnValue(col, value, buf);
- return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
- }
-
- public static ExtractTupleFilter unevaluatable(TblColRef col) {
- ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
- r.addChild(new ColumnTupleFilter(col));
- return r;
- }
-
- public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
- CompareTupleFilter result = new CompareTupleFilter(op);
- result.addChild(new ColumnTupleFilter(col));
- result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
- return result;
- }
-
- public static LogicalTupleFilter and(TupleFilter... children) {
- return logic(FilterOperatorEnum.AND, children);
- }
-
- public static LogicalTupleFilter or(TupleFilter... children) {
- return logic(FilterOperatorEnum.OR, children);
- }
-
- public static LogicalTupleFilter not(TupleFilter child) {
- return logic(FilterOperatorEnum.NOT, child);
- }
-
- public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
- LogicalTupleFilter result = new LogicalTupleFilter(op);
- for (TupleFilter c : children) {
- result.addChild(c);
- }
- return result;
- }
-
- public static GridTable newTestTable() throws IOException {
- GTInfo info = newInfo();
- GTSimpleMemStore store = new GTSimpleMemStore(info);
- GridTable table = new GridTable(info, store);
-
- GTRecord r = new GTRecord(table.getInfo());
- GTBuilder builder = table.rebuild();
-
- builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
- builder.close();
-
- return table;
- }
-
- static GridTable newTestPerfTable() throws IOException {
- GTInfo info = newInfo();
- GTSimpleMemStore store = new GTSimpleMemStore(info);
- GridTable table = new GridTable(info, store);
-
- GTRecord r = new GTRecord(table.getInfo());
- GTBuilder builder = table.rebuild();
-
- for (int i = 0; i < 100000; i++) {
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
-
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
-
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
-
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
-
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
-
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
-
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
-
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
-
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
-
- for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
- }
- builder.close();
-
- return table;
- }
-
- static GTInfo newInfo() {
- Builder builder = GTInfo.builder();
- builder.setCodeSystem(newDictCodeSystem());
- builder.setColumns( //
- DataType.getType("timestamp"), //
- DataType.getType("integer"), //
- DataType.getType("varchar(10)"), //
- DataType.getType("bigint"), //
- DataType.getType("decimal") //
- );
- builder.setPrimaryKey(setOf(0, 1));
- builder.setColumnPreferIndex(setOf(0));
- builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
- builder.enableRowBlock(4);
- GTInfo info = builder.build();
- return info;
- }
-
- @SuppressWarnings("unchecked")
- private static CubeCodeSystem newDictCodeSystem() {
- DimensionEncoding[] dimEncs = new DimensionEncoding[3];
- dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
- dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
- return new CubeCodeSystem(dimEncs);
- }
-
- @SuppressWarnings("rawtypes")
- private static Dictionary newDictionaryOfString() {
- TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
- builder.addValue("Dong");
- builder.addValue("George");
- builder.addValue("Jason");
- builder.addValue("Kejia");
- builder.addValue("Luke");
- builder.addValue("Mahone");
- builder.addValue("Qianhao");
- builder.addValue("Shaofeng");
- builder.addValue("Xu");
- builder.addValue("Yang");
- return builder.build(0);
- }
-
- @SuppressWarnings("rawtypes")
- private static Dictionary newDictionaryOfInteger() {
- NumberDictionaryBuilder<String> builder = new NumberDictionaryBuilder<>(new StringBytesConverter());
- builder.addValue("10");
- builder.addValue("20");
- builder.addValue("30");
- builder.addValue("40");
- builder.addValue("50");
- builder.addValue("60");
- builder.addValue("70");
- builder.addValue("80");
- builder.addValue("90");
- builder.addValue("100");
- return builder.build(0);
- }
-
- public static ImmutableBitSet setOf(int... values) {
- BitSet set = new BitSet();
- for (int i : values)
- set.set(i);
- return new ImmutableBitSet(set);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
new file mode 100644
index 0000000..9f505f3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.FuzzyValueCombination;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.gridtable.RecordComparators;
+import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
+import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.IGTComparator;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class CubeScanRangePlanner extends ScanRangePlannerBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeScanRangePlanner.class);
+
+ protected int maxScanRanges;
+ protected int maxFuzzyKeys;
+
+ //non-GT
+ protected CubeSegment cubeSegment;
+ protected CubeDesc cubeDesc;
+ protected Cuboid cuboid;
+
+ protected StorageContext context;
+
+ public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, Set<TblColRef> groupbyDims, //
+ Collection<FunctionDesc> metrics, StorageContext context) {
+ this.context = context;
+
+ this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
+ this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+
+ this.cubeSegment = cubeSegment;
+ this.cubeDesc = cubeSegment.getCubeDesc();
+ this.cuboid = cuboid;
+
+ Set<TblColRef> filterDims = Sets.newHashSet();
+ TupleFilter.collectColumns(filter, filterDims);
+
+ this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId());
+ CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+
+ IGTComparator comp = gtInfo.getCodeSystem().getComparator();
+ //start key GTRecord compare to start key GTRecord
+ this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp);
+ //stop key GTRecord compare to stop key GTRecord
+ this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp);
+ //start key GTRecord compare to stop key GTRecord
+ this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
+
+ //replace the constant values in filter to dictionary codes
+ this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getCuboidDimensionsInGTOrder(), groupbyDims);
+
+ this.gtDimensions = mapping.makeGridTableColumns(dimensions);
+ this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupbyDims, cubeSegment.getCubeDesc()));
+ this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
+ this.gtAggrFuncs = mapping.makeAggrFuncs(metrics);
+
+ if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) {
+ int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef());
+ if (index >= 0) {
+ SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo);
+ this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index);
+ this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index);
+ this.gtPartitionCol = gtInfo.colRef(index);
+ }
+ }
+
+ }
+
+ /**
+ * constrcut GTScanRangePlanner with incomplete information. only be used for UT
+ * @param info
+ * @param gtStartAndEnd
+ * @param gtPartitionCol
+ * @param gtFilter
+ */
+ public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
+
+ this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
+ this.maxFuzzyKeys = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+
+ this.gtInfo = info;
+
+ IGTComparator comp = gtInfo.getCodeSystem().getComparator();
+ //start key GTRecord compare to start key GTRecord
+ this.rangeStartComparator = RecordComparators.getRangeStartComparator(comp);
+ //stop key GTRecord compare to stop key GTRecord
+ this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp);
+ //start key GTRecord compare to stop key GTRecord
+ this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
+
+ this.gtFilter = gtFilter;
+ this.gtStartAndEnd = gtStartAndEnd;
+ this.gtPartitionCol = gtPartitionCol;
+ }
+
+ public GTScanRequest planScanRequest() {
+ GTScanRequest scanRequest;
+ List<GTScanRange> scanRanges = this.planScanRanges();
+ if (scanRanges != null && scanRanges.size() != 0) {
+ GTScanRequestBuilder builder = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).//
+ setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).//
+ setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB()).//
+ setStorageScanRowNumThreshold(context.getThreshold());
+
+ if (cubeDesc.supportsLimitPushDown()) {
+ builder.setStoragePushDownLimit(context.getStoragePushDownLimit());
+ }
+ scanRequest = builder.createGTScanRequest();
+ } else {
+ scanRequest = null;
+ }
+ return scanRequest;
+ }
+
+ /**
+ * Overwrite this method to provide smarter storage visit plans
+ * @return
+ */
+ public List<GTScanRange> planScanRanges() {
+ TupleFilter flatFilter = flattenToOrAndFilter(gtFilter);
+
+ List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
+
+ List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
+ for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
+ GTScanRange scanRange = newScanRange(andDimRanges);
+ if (scanRange != null)
+ scanRanges.add(scanRange);
+ }
+
+ List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
+ mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges);
+
+ return mergedRanges;
+ }
+
+ private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
+ Set<TblColRef> ret = Sets.newHashSet();
+ for (TblColRef col : input) {
+ if (cubeDesc.hasHostColumn(col)) {
+ for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
+ ret.add(host);
+ }
+ } else {
+ ret.add(col);
+ }
+ }
+ return ret;
+ }
+
+ protected GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
+ GTRecord pkStart = new GTRecord(gtInfo);
+ GTRecord pkEnd = new GTRecord(gtInfo);
+ Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
+
+ List<GTRecord> fuzzyKeys;
+
+ for (ColumnRange range : andDimRanges) {
+ if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
+ int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond());
+ int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end);
+
+ if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) {
+ //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition.
+ } else {
+ logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", //
+ gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end));
+ return null;
+ }
+ }
+
+ int col = range.column.getColumnDesc().getZeroBasedIndex();
+ if (!gtInfo.getPrimaryKey().get(col))
+ continue;
+
+ pkStart.set(col, range.begin);
+ pkEnd.set(col, range.end);
+
+ if (range.valueSet != null && !range.valueSet.isEmpty()) {
+ fuzzyValues.put(col, range.valueSet);
+ }
+ }
+
+ fuzzyKeys =
+
+ buildFuzzyKeys(fuzzyValues);
+ return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
+ }
+
+ private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) {
+ ArrayList<GTRecord> result = Lists.newArrayList();
+
+ if (fuzzyValueSet.isEmpty())
+ return result;
+
+ // debug/profiling purpose
+ if (BackdoorToggles.getDisableFuzzyKey()) {
+ logger.info("The execution of this query will not use fuzzy key");
+ return result;
+ }
+
+ List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, maxFuzzyKeys);
+
+ for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
+
+ // BitSet bitSet = new BitSet(gtInfo.getColumnCount());
+ // for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
+ // bitSet.set(entry.getKey());
+ // }
+ GTRecord fuzzy = new GTRecord(gtInfo);
+ for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
+ fuzzy.set(entry.getKey(), entry.getValue());
+ }
+
+ result.add(fuzzy);
+ }
+ return result;
+ }
+
+ protected List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
+ if (ranges.size() <= 1) {
+ return ranges;
+ }
+
+ // sort ranges by start key
+ Collections.sort(ranges, new Comparator<GTScanRange>() {
+ @Override
+ public int compare(GTScanRange a, GTScanRange b) {
+ return rangeStartComparator.compare(a.pkStart, b.pkStart);
+ }
+ });
+
+ // merge the overlap range
+ List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>();
+ int mergeBeginIndex = 0;
+ GTRecord mergeEnd = ranges.get(0).pkEnd;
+ for (int index = 1; index < ranges.size(); index++) {
+ GTScanRange range = ranges.get(index);
+
+ // if overlap, swallow it
+ if (rangeStartEndComparator.compare(range.pkStart, mergeEnd) <= 0) {
+ mergeEnd = rangeEndComparator.max(mergeEnd, range.pkEnd);
+ continue;
+ }
+
+ // not overlap, split here
+ GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, index));
+ mergedRanges.add(mergedRange);
+
+ // start new split
+ mergeBeginIndex = index;
+ mergeEnd = range.pkEnd;
+ }
+
+ // don't miss the last range
+ GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, ranges.size()));
+ mergedRanges.add(mergedRange);
+
+ return mergedRanges;
+ }
+
+ private GTScanRange mergeKeyRange(List<GTScanRange> ranges) {
+ GTScanRange first = ranges.get(0);
+ if (ranges.size() == 1)
+ return first;
+
+ GTRecord start = first.pkStart;
+ GTRecord end = first.pkEnd;
+ List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>();
+
+ boolean hasNonFuzzyRange = false;
+ for (GTScanRange range : ranges) {
+ hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty();
+ newFuzzyKeys.addAll(range.fuzzyKeys);
+ end = rangeEndComparator.max(end, range.pkEnd);
+ }
+
+ // if any range is non-fuzzy, then all fuzzy keys must be cleared
+ // also too many fuzzy keys will slow down HBase scan
+ if (hasNonFuzzyRange || newFuzzyKeys.size() > maxFuzzyKeys) {
+ newFuzzyKeys.clear();
+ }
+
+ return new GTScanRange(start, end, newFuzzyKeys);
+ }
+
+ protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
+ if (ranges.size() <= maxRanges) {
+ return ranges;
+ }
+
+ // TODO: check the distance between range and merge the large distance range
+ List<GTScanRange> result = new ArrayList<GTScanRange>(1);
+ GTScanRange mergedRange = mergeKeyRange(ranges);
+ result.add(mergedRange);
+ return result;
+ }
+
+ public int getMaxScanRanges() {
+ return maxScanRanges;
+ }
+
+ public void setMaxScanRanges(int maxScanRanges) {
+ this.maxScanRanges = maxScanRanges;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 3b9d9c6..f32831a 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -23,10 +23,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CubeScanRangePlanner;
import org.apache.kylin.dict.BuiltInFunctionTransformer;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
@@ -67,23 +65,13 @@ public class CubeSegmentScanner implements IGTScanner {
ITupleFilterTransformer translator = new BuiltInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
filter = translator.transform(filter);
- String plannerName = KylinConfig.getInstanceFromEnv().getQueryStorageVisitPlanner();
CubeScanRangePlanner scanRangePlanner;
try {
- scanRangePlanner = (CubeScanRangePlanner) Class.forName(plannerName).getConstructor(CubeSegment.class, Cuboid.class, TupleFilter.class, Set.class, Set.class, Collection.class).newInstance(cubeSeg, cuboid, filter, dimensions, groups, metrics);
+ scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics, context);
} catch (Exception e) {
throw new RuntimeException(e);
}
scanRequest = scanRangePlanner.planScanRequest();
- if (scanRequest != null) {
- scanRequest.setAllowStorageAggregation(context.isNeedStorageAggregation());
- scanRequest.setAggCacheMemThreshold(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
- scanRequest.setStorageScanRowNumThreshold(context.getThreshold());//TODO: devide by shard number?
-
- if (cubeSeg.getCubeDesc().supportsLimitPushDown()) {
- scanRequest.setStoragePushDownLimit(context.getStoragePushDownLimit());
- }
- }
scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e38557b4/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 86346f8..f0c2494 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.gtrecord;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -72,10 +73,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-
+
//cope with queries with no aggregations
RawQueryLastHacker.hackNoAggregations(sqlDigest, cubeDesc);
-
+
// Customized measure taking effect: e.g. allow custom measures to help raw queries
notifyBeforeStorageQuery(sqlDigest);
@@ -112,9 +113,9 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
TupleFilter filterD = translateDerived(filter, groupsD);
- context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation));
- enableStoragePushDownLimit(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
- setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
+ context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD));
+ enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
+ setThresholdIfNecessary(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
List<CubeSegmentScanner> scanners = Lists.newArrayList();
for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
@@ -229,9 +230,22 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
return resultD;
}
- public boolean isNeedStorageAggregation(Cuboid cuboid, Collection<TblColRef> groupD, Collection<TblColRef> singleValueD, boolean isExactAggregation) {
- logger.info("Set isNeedStorageAggregation to " + !isExactAggregation);
- return !isExactAggregation;
+ public boolean isNeedStorageAggregation(Cuboid cuboid, Collection<TblColRef> groupD, Collection<TblColRef> singleValueD) {
+
+ logger.info("GroupD :" + groupD);
+ logger.info("SingleValueD :" + singleValueD);
+ logger.info("Cuboid columns :" + cuboid.getColumns());
+
+ HashSet<TblColRef> temp = Sets.newHashSet();
+ temp.addAll(groupD);
+ temp.addAll(singleValueD);
+ if (cuboid.getColumns().size() == temp.size()) {
+ logger.info("Does not need storage aggregation");
+ return false;
+ } else {
+ logger.info("Need storage aggregation");
+ return true;
+ }
}
//exact aggregation was introduced back when we had some measures (like holistic distinct count) that is sensitive
@@ -268,7 +282,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
if (exact) {
- logger.info("exactAggregation is true");
+ logger.info("exactAggregation is true, cuboid id is " + cuboid.getId());
}
return exact;
}
@@ -355,7 +369,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
- private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
+ private void setThresholdIfNecessary(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
boolean hasMemHungryMeasure = false;
for (FunctionDesc func : metrics) {
hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry();
@@ -381,7 +395,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
- private void enableStoragePushDownLimit(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
+ private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
boolean possible = true;
boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());