You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/08/10 04:56:57 UTC
[2/2] kylin git commit: KYLIN-1936 improve enable limit logic
(exactAggregation is too strict)
KYLIN-1936 improve enable limit logic (exactAggregation is too strict)
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/28e94230
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/28e94230
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/28e94230
Branch: refs/heads/master
Commit: 28e942306d545ff3b5f604e8225bacad650ad8ac
Parents: c67891d
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Aug 5 12:50:27 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Aug 10 12:56:32 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 6 +-
.../inmemcubing/AbstractInMemCubeBuilder.java | 3 +-
.../cube/inmemcubing/DoggedCubeBuilder.java | 4 +-
.../cube/inmemcubing/InMemCubeBuilder.java | 9 +-
.../kylin/gridtable/GTAggregateScanner.java | 38 ++++-
.../java/org/apache/kylin/gridtable/GTInfo.java | 2 +-
.../org/apache/kylin/gridtable/GTRecord.java | 33 +++-
.../GTScanExceedThresholdException.java | 26 +++
.../kylin/gridtable/GTScanRangePlanner.java | 2 +-
.../apache/kylin/gridtable/GTScanRequest.java | 92 +++++-----
.../kylin/gridtable/GTScanRequestBuilder.java | 97 +++++++++++
.../kylin/gridtable/GTScanTimeoutException.java | 26 +++
.../gridtable/benchmark/GTScannerBenchmark.java | 5 +-
.../benchmark/GTScannerBenchmark2.java | 5 +-
.../inmemcubing/ConcurrentDiskStoreTest.java | 4 +-
.../cube/inmemcubing/MemDiskStoreTest.java | 4 +-
.../gridtable/AggregationCacheSpillTest.java | 12 +-
.../kylin/gridtable/DictGridTableTest.java | 10 +-
.../kylin/gridtable/SimpleGridTableTest.java | 4 +-
.../org/apache/kylin/metadata/tuple/ITuple.java | 2 +-
.../org/apache/kylin/metadata/tuple/Tuple.java | 5 +
.../apache/kylin/storage/StorageContext.java | 4 +
.../storage/gtrecord/CubeSegmentScanner.java | 8 +-
.../storage/gtrecord/CubeTupleConverter.java | 148 +++++++++++-----
.../gtrecord/FetchSourceAwareIterator.java | 24 +++
.../gtrecord/GTCubeStorageQueryBase.java | 47 +++++-
.../storage/gtrecord/IFetchSourceAware.java | 25 +++
.../kylin/storage/gtrecord/PeekingImpl.java | 73 ++++++++
.../gtrecord/SegmentCubeTupleIterator.java | 162 ++++++++++++++++++
.../gtrecord/SequentialCubeTupleIterator.java | 151 ++++-------------
.../storage/gtrecord/SortedIteratorMerger.java | 100 +++++++++++
.../gtrecord/SortedIteratorMergerWithLimit.java | 143 ++++++++++++++++
.../gtrecord/SortedIteratorMergerTest.java | 97 +++++++++++
.../SortedIteratorMergerWithLimitTest.java | 127 ++++++++++++++
.../apache/kylin/query/HackedDbUnitAssert.java | 169 +++++++++++++++++++
.../apache/kylin/query/ITKylinQueryTest.java | 12 +-
.../org/apache/kylin/query/KylinTestBase.java | 72 ++++++++
.../hbase/cube/HBaseScannerBenchmark.java | 6 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 165 +-----------------
.../hbase/cube/v2/ExpectedSizeIterator.java | 116 +++++++++++++
.../storage/hbase/cube/v2/GTBlobScatter.java | 150 ++++++++++++++++
.../coprocessor/endpoint/CubeVisitService.java | 70 ++++----
42 files changed, 1811 insertions(+), 447 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f51dce6..eb4102a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -23,9 +23,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
import java.util.SortedSet;
+import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -470,6 +470,10 @@ abstract public class KylinConfigBase implements Serializable {
return Float.parseFloat(getOptional("kylin.hbase.hfile.size.gb", "2.0"));
}
+ public int getStoragePushDownLimitMax() {
+ return Integer.parseInt(getOptional("kylin.query.pushdown.limit.max", "10000"));
+ }
+
public int getScanThreshold() {
return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index e385ab9..d417d11 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -25,6 +25,7 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.model.TblColRef;
@@ -83,7 +84,7 @@ abstract public class AbstractInMemCubeBuilder {
protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
long startTime = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = gridTable.scan(req);
for (GTRecord record : scanner) {
output.write(cuboidId, record);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 606c820..15f2241 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -35,7 +35,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.metadata.model.TblColRef;
@@ -399,7 +399,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
if (cuboidIterator.hasNext()) {
CuboidResult cuboid = cuboidIterator.next();
currentCuboidId = cuboid.cuboidId;
- scanner = cuboid.table.scan(new GTScanRequest(cuboid.table.getInfo(), null, null, null));
+ scanner = cuboid.table.scan(new GTScanRequestBuilder().setInfo(cuboid.table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
recordIterator = scanner.iterator();
} else {
return false;
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e12e815..36d1296 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -31,8 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
@@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTBuilder;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.topn.Counter;
@@ -329,8 +330,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
- GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
- GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(baseCuboid.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(dimensionMetricsBitSet.getFirst()).setAggrMetrics(dimensionMetricsBitSet.getSecond()).setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
+ GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, Long.MAX_VALUE);
aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
int count = 0;
@@ -397,7 +398,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
GTInfo info = gridTable.getInfo();
- GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(aggregationColumns).setAggrMetrics(measureColumns).setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
// for child cuboid, some measures don't need aggregation.
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index cb23af4..ccf4895 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -63,24 +63,28 @@ public class GTAggregateScanner implements IGTScanner {
final IGTScanner inputScanner;
final AggregationCache aggrCache;
final long spillThreshold;
+ final int storagePushDownLimit;//default to be Int.MAX
+ final long deadline;
private int aggregatedRowCount = 0;
private MemoryWaterLevel memTracker;
private boolean[] aggrMask;
- public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+ public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, long deadline) {
if (!req.hasAggregation())
throw new IllegalStateException();
this.info = inputScanner.getInfo();
- this.dimensions = req.getColumns().andNot(req.getAggrMetrics());
+ this.dimensions = req.getDimensions();
this.groupBy = req.getAggrGroupBy();
this.metrics = req.getAggrMetrics();
this.metricsAggrFuncs = req.getAggrMetricsFuncs();
this.inputScanner = inputScanner;
this.aggrCache = new AggregationCache();
- this.spillThreshold = (long) (req.getAggrCacheGB() * MemoryBudgetController.ONE_GB);
+ this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
this.aggrMask = new boolean[metricsAggrFuncs.length];
+ this.storagePushDownLimit = req.getStoragePushDownLimit();
+ this.deadline = deadline;
Arrays.fill(aggrMask, true);
}
@@ -133,8 +137,25 @@ public class GTAggregateScanner implements IGTScanner {
public Iterator<GTRecord> iterator() {
long count = 0;
for (GTRecord r : inputScanner) {
+
count++;
- aggrCache.aggregate(r);
+
+ if (getNumOfSpills() == 0) {
+ //check limit
+ boolean ret = aggrCache.aggregate(r, storagePushDownLimit);
+
+ if (!ret) {
+ logger.info("abort reading inputScanner because storage push down limit is hit");
+ break;//limit is hit
+ }
+ } else {//else if dumps is not empty, it means a lot of row need aggregated, so it's less likely that limit clause is helping
+ aggrCache.aggregate(r, Integer.MAX_VALUE);
+ }
+
+ //check deadline
+ if (count % 10000 == 1 && System.currentTimeMillis() > deadline) {
+ throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count);
+ }
}
logger.info("GTAggregateScanner input rows: " + count);
return aggrCache.iterator();
@@ -241,7 +262,7 @@ public class GTAggregateScanner implements IGTScanner {
return result;
}
- void aggregate(GTRecord r) {
+ boolean aggregate(GTRecord r, int stopForLimit) {
if (++aggregatedRowCount % 100000 == 0) {
if (memTracker != null) {
memTracker.markHigh();
@@ -257,6 +278,12 @@ public class GTAggregateScanner implements IGTScanner {
final byte[] key = createKey(r);
MeasureAggregator[] aggrs = aggBufMap.get(key);
if (aggrs == null) {
+
+ //for storage push down limit
+ if (aggBufMap.size() >= stopForLimit) {
+ return false;
+ }
+
aggrs = newAggregators();
aggBufMap.put(key, aggrs);
}
@@ -267,6 +294,7 @@ public class GTAggregateScanner implements IGTScanner {
aggrs[i].aggregate(metrics);
}
}
+ return true;
}
private void spillBuffMap() throws RuntimeException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index 673d22e..21da4ea 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -151,7 +151,7 @@ public class GTInfo {
if (!expected.equals(ref))
throw new IllegalArgumentException();
}
-
+
void validate() {
if (codeSystem == null)
throw new IllegalStateException();
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 37f42c7..4d26029 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -20,6 +20,7 @@ package org.apache.kylin.gridtable;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import org.apache.kylin.common.util.ByteArray;
@@ -27,7 +28,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
import com.google.common.base.Preconditions;
-public class GTRecord implements Comparable<GTRecord> {
+public class GTRecord implements Comparable<GTRecord>, Cloneable {
final transient GTInfo info;
final ByteArray[] cols;
@@ -54,6 +55,11 @@ public class GTRecord implements Comparable<GTRecord> {
}
}
+ @Override
+ public Object clone() {
+ return new GTRecord(this);
+ }
+
public GTInfo getInfo() {
return info;
}
@@ -189,12 +195,33 @@ public class GTRecord implements Comparable<GTRecord> {
@Override
public int compareTo(GTRecord o) {
+ return compareToInternal(o, info.colAll);
+ }
+
+ public int compareToOnPrimaryKey(GTRecord o) {
+ return compareToInternal(o, info.primaryKey);
+ }
+
+ public static Comparator<GTRecord> getPrimaryKeyComparator() {
+ return new Comparator<GTRecord>() {
+ @Override
+ public int compare(GTRecord o1, GTRecord o2) {
+ if (o1 == null || o2 == null) {
+ throw new IllegalStateException("Cannot handle null");
+ }
+
+ return o1.compareToOnPrimaryKey(o2);
+ }
+ };
+ }
+
+ private int compareToInternal(GTRecord o, ImmutableBitSet participateCols) {
assert this.info == o.info; // reference equal for performance
IGTComparator comparator = info.codeSystem.getComparator();
int comp = 0;
- for (int i = 0; i < info.colAll.trueBitCount(); i++) {
- int c = info.colAll.trueBitAt(i);
+ for (int i = 0; i < participateCols.trueBitCount(); i++) {
+ int c = participateCols.trueBitAt(i);
comp = comparator.compare(cols[c], o.cols[c]);
if (comp != 0)
return comp;
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
new file mode 100644
index 0000000..dd57e90
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public class GTScanExceedThresholdException extends RuntimeException {
+
+ public GTScanExceedThresholdException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index 17d27f9..b8f7e0e 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -165,7 +165,7 @@ public class GTScanRangePlanner {
GTScanRequest scanRequest;
List<GTScanRange> scanRanges = this.planScanRanges();
if (scanRanges != null && scanRanges.size() != 0) {
- scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter);
+ scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).createGTScanRequest();
} else {
scanRequest = null;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 0ce6b4c..e2bac3d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -57,20 +57,14 @@ public class GTScanRequest {
private String[] aggrMetricsFuncs;
// hint to storage behavior
- private boolean allowPreAggregation = true;
- private double aggrCacheGB = 0; // 0 means no row/memory limit; positive means memory limit in GB; negative means row limit
-
- public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) {
- this(info, ranges, columns, null, null, null, filterPushDown, true, 0);
- }
-
- public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
- ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
- this(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, true, 0);
- }
-
- public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
- ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) {
+ private boolean allowStorageAggregation;
+ private double aggCacheMemThreshold;
+ private int storageScanRowNumThreshold;
+ private int storagePushDownLimit;
+
+ GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+ ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, //
+ double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit) {
this.info = info;
if (ranges == null) {
this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
@@ -84,8 +78,10 @@ public class GTScanRequest {
this.aggrMetrics = aggrMetrics;
this.aggrMetricsFuncs = aggrMetricsFuncs;
- this.allowPreAggregation = allowPreAggregation;
- this.aggrCacheGB = aggrCacheGB;
+ this.allowStorageAggregation = allowStorageAggregation;
+ this.aggCacheMemThreshold = aggCacheMemThreshold;
+ this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+ this.storagePushDownLimit = storagePushDownLimit;
validate(info);
}
@@ -143,7 +139,7 @@ public class GTScanRequest {
}
public IGTScanner decorateScanner(IGTScanner scanner) throws IOException {
- return decorateScanner(scanner, true, true);
+ return decorateScanner(scanner, true, true, Long.MAX_VALUE);
}
/**
@@ -152,7 +148,7 @@ public class GTScanRequest {
* <p/>
* Refer to CoprocessorBehavior for explanation
*/
- public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException {
+ public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr, long deadline) throws IOException {
IGTScanner result = scanner;
if (!doFilter) { //Skip reading this section if you're not profiling!
int scanned = lookAndForget(result);
@@ -169,11 +165,11 @@ public class GTScanRequest {
return new EmptyGTScanner(scanned);
}
- if (!this.allowPreAggregation) {
+ if (!this.isAllowStorageAggregation()) {
logger.info("pre aggregation is not beneficial, skip it");
} else if (this.hasAggregation()) {
logger.info("pre aggregating results before returning");
- result = new GTAggregateScanner(result, this);
+ result = new GTAggregateScanner(result, this, deadline);
} else {
logger.info("has no aggregation, skip it");
}
@@ -235,6 +231,10 @@ public class GTScanRequest {
return filterPushDown;
}
+ public ImmutableBitSet getDimensions() {
+ return this.getColumns().andNot(this.getAggrMetrics());
+ }
+
public ImmutableBitSet getAggrGroupBy() {
return aggrGroupBy;
}
@@ -247,34 +247,41 @@ public class GTScanRequest {
return aggrMetricsFuncs;
}
- public boolean isAllowPreAggregation() {
- return allowPreAggregation;
+ public boolean isAllowStorageAggregation() {
+ return allowStorageAggregation;
}
- public void setAllowPreAggregation(boolean allowPreAggregation) {
- this.allowPreAggregation = allowPreAggregation;
+ public void setAllowStorageAggregation(boolean allowStorageAggregation) {
+ this.allowStorageAggregation = allowStorageAggregation;
}
- public double getAggrCacheGB() {
- if (aggrCacheGB < 0)
+ public double getAggCacheMemThreshold() {
+ if (aggCacheMemThreshold < 0)
return 0;
else
- return aggrCacheGB;
+ return aggCacheMemThreshold;
}
- public void setAggrCacheGB(double gb) {
- this.aggrCacheGB = gb;
+ public void setAggCacheMemThreshold(double gb) {
+ this.aggCacheMemThreshold = gb;
}
- public int getRowLimit() {
- if (aggrCacheGB < 0)
- return (int) -aggrCacheGB;
- else
- return 0;
+ public int getStorageScanRowNumThreshold() {
+ return storageScanRowNumThreshold;
+ }
+
+ public void setStorageScanRowNumThreshold(int storageScanRowNumThreshold) {
+ logger.info("storageScanRowNumThreshold is set to " + storageScanRowNumThreshold);
+ this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+ }
+
+ public int getStoragePushDownLimit() {
+ return this.storagePushDownLimit;
}
- public void setRowLimit(int limit) {
- aggrCacheGB = -limit;
+ public void setStoragePushDownLimit(int limit) {
+ logger.info("storagePushDownLimit is set to " + storagePushDownLimit);
+ this.storagePushDownLimit = limit;
}
public List<Integer> getRequiredMeasures() {
@@ -323,8 +330,10 @@ public class GTScanRequest {
ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out);
ImmutableBitSet.serializer.serialize(value.aggrMetrics, out);
BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out);
- BytesUtil.writeVInt(value.allowPreAggregation ? 1 : 0, out);
- out.putDouble(value.aggrCacheGB);
+ BytesUtil.writeVInt(value.allowStorageAggregation ? 1 : 0, out);
+ out.putDouble(value.aggCacheMemThreshold);
+ BytesUtil.writeVInt(value.storageScanRowNumThreshold, out);
+ BytesUtil.writeVInt(value.storagePushDownLimit, out);
}
@Override
@@ -353,8 +362,13 @@ public class GTScanRequest {
String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in);
boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1);
double sAggrCacheGB = in.getDouble();
+ int storageScanRowNumThreshold = BytesUtil.readVInt(in);
+ int storagePushDownLimit = BytesUtil.readVInt(in);
- return new GTScanRequest(sInfo, sRanges, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
+ return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).//
+ setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).//
+ setFilterPushDown(sGTFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).//
+ setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).createGTScanRequest();
}
private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
new file mode 100644
index 0000000..49ec759
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.TupleFilter;
+
+public class GTScanRequestBuilder {
+ private GTInfo info;
+ private List<GTScanRange> ranges;
+ private TupleFilter filterPushDown;
+ private ImmutableBitSet dimensions;
+ private ImmutableBitSet aggrGroupBy = null;
+ private ImmutableBitSet aggrMetrics = null;
+ private String[] aggrMetricsFuncs = null;
+ private boolean allowStorageAggregation = true;
+ private double aggCacheMemThreshold = 0;
+ private int storageScanRowNumThreshold = Integer.MAX_VALUE;// storage should terminate itself when $storageScanRowNumThreshold cuboid rows are scanned, and throw exception.
+ private int storagePushDownLimit = Integer.MAX_VALUE;// storage can quit working when $toragePushDownLimit aggregated rows are produced.
+
+ public GTScanRequestBuilder setInfo(GTInfo info) {
+ this.info = info;
+ return this;
+ }
+
+ public GTScanRequestBuilder setRanges(List<GTScanRange> ranges) {
+ this.ranges = ranges;
+ return this;
+ }
+
+ public GTScanRequestBuilder setFilterPushDown(TupleFilter filterPushDown) {
+ this.filterPushDown = filterPushDown;
+ return this;
+ }
+
+ public GTScanRequestBuilder setDimensions(ImmutableBitSet dimensions) {
+ this.dimensions = dimensions;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAggrGroupBy(ImmutableBitSet aggrGroupBy) {
+ this.aggrGroupBy = aggrGroupBy;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAggrMetrics(ImmutableBitSet aggrMetrics) {
+ this.aggrMetrics = aggrMetrics;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAggrMetricsFuncs(String[] aggrMetricsFuncs) {
+ this.aggrMetricsFuncs = aggrMetricsFuncs;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAllowStorageAggregation(boolean allowStorageAggregation) {
+ this.allowStorageAggregation = allowStorageAggregation;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAggCacheMemThreshold(double aggCacheMemThreshold) {
+ this.aggCacheMemThreshold = aggCacheMemThreshold;
+ return this;
+ }
+
+ public GTScanRequestBuilder setStorageScanRowNumThreshold(int storageScanRowNumThreshold) {
+ this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+ return this;
+ }
+
+ public GTScanRequestBuilder setStoragePushDownLimit(int storagePushDownLimit) {
+ this.storagePushDownLimit = storagePushDownLimit;
+ return this;
+ }
+
+ public GTScanRequest createGTScanRequest() {
+ return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
new file mode 100644
index 0000000..e92dae3
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public class GTScanTimeoutException extends RuntimeException {
+
+ public GTScanTimeoutException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
index 680ba33..589f37c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
@@ -29,6 +29,7 @@ import org.apache.kylin.gridtable.GTInfo.Builder;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTSampleCodeSystem;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -110,7 +111,7 @@ public class GTScannerBenchmark {
@SuppressWarnings("unused")
private void testAggregate(ImmutableBitSet groupBy) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(dimensions).setAggrGroupBy(groupBy).setAggrMetrics(metrics).setAggrMetricsFuncs(aggrFuncs).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
@@ -154,7 +155,7 @@ public class GTScannerBenchmark {
@SuppressWarnings("unused")
private void testFilter(TupleFilter filter) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, info.getAllColumns(), filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(info.getAllColumns()).setFilterPushDown(filter).createGTScanRequest();
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
index ae86e46..40a5e01 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
@@ -31,6 +31,7 @@ import org.apache.kylin.gridtable.GTInfo.Builder;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTSampleCodeSystem;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator.Randomizer;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
@@ -132,7 +133,7 @@ public class GTScannerBenchmark2 {
@SuppressWarnings("unused")
private void testAggregate(ImmutableBitSet groupBy) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(dimensions).setAggrGroupBy(groupBy).setAggrMetrics(metrics).setAggrMetricsFuncs(aggrFuncs).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
@@ -176,7 +177,7 @@ public class GTScannerBenchmark2 {
@SuppressWarnings("unused")
private void testFilter(TupleFilter filter) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, info.getAllColumns(), filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(info.getAllColumns()).setFilterPushDown(filter).createGTScanRequest();
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
index ceaf80d..6355f4a 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.gridtable.GTBuilder;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.UnitTestSupport;
@@ -84,7 +84,7 @@ public class ConcurrentDiskStoreTest extends LocalFileMetadataTestCase {
t[i] = new Thread() {
public void run() {
try {
- IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo(), null, null, null));
+ IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
int i = 0;
for (GTRecord r : scanner) {
assertEquals(data.get(i++), r);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
index 807a6e3..06ade1c 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
@@ -27,7 +27,7 @@ import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.gridtable.GTBuilder;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.UnitTestSupport;
@@ -100,7 +100,7 @@ public class MemDiskStoreTest extends LocalFileMetadataTestCase {
}
builder.close();
- IGTScanner scanner = table.scan(new GTScanRequest(info, null, null, null));
+ IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
int i = 0;
for (GTRecord r : scanner) {
assertEquals(data.get(i++), r);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
index 4160f86..b5f6de7 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -84,10 +84,10 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
}
};
- GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
- scanRequest.setAggrCacheGB(0.5);
+ GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(0, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
+ scanRequest.setAggCacheMemThreshold(0.5);
- GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+ GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
int count = 0;
for (GTRecord record : scanner) {
@@ -127,10 +127,10 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
};
// all-in-mem testcase
- GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
- scanRequest.setAggrCacheGB(0.5);
+ GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(1, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
+ scanRequest.setAggCacheMemThreshold(0.5);
- GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+ GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
int count = 0;
for (GTRecord record : scanner) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index af39e21..74338af 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -256,7 +256,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
@Test
public void verifyFirstRow() throws IOException {
- doScanAndVerify(table, new GTScanRequest(table.getInfo(), null, null, null), "[1421193600000, 30, Yang, 10, 10.5]", //
+ doScanAndVerify(table, new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(), "[1421193600000, 30, Yang, 10, 10.5]", //
"[1421193600000, 30, Luke, 10, 10.5]", //
"[1421280000000, 20, Dong, 10, 10.5]", //
"[1421280000000, 20, Jason, 10, 10.5]", //
@@ -276,7 +276,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
- Assert.assertEquals(origin.getAggrCacheGB(), sGTScanRequest.getAggrCacheGB(), 0.01);
+ Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
return sGTScanRequest;
}
@@ -289,7 +289,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
- GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
// note the unEvaluatable column 1 in filter is added to group by
assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
@@ -305,7 +305,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
LogicalTupleFilter filter = and(fComp1, fComp2);
- GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
// note the evaluatable column 1 in filter is added to returned columns but not in group by
assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
@@ -334,7 +334,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
@SuppressWarnings("unused")
private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) throws IOException {
long start = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, null, filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(filter).createGTScanRequest();
IGTScanner scanner = table.scan(req);
int i = 0;
for (GTRecord r : scanner) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 2abe928..fd571d0 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -90,7 +90,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
}
private IGTScanner scan(GridTable table) throws IOException {
- GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = table.scan(req);
for (GTRecord r : scanner) {
Object[] v = r.getValues();
@@ -106,7 +106,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
}
private IGTScanner scanAndAggregate(GridTable table) throws IOException {
- GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0, 2)).setAggrMetrics(setOf(3, 4)).setAggrMetricsFuncs(new String[]{"count", "sum"}).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = table.scan(req);
int i = 0;
for (GTRecord r : scanner) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
index 7d401ec..742cfba 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
*
* @author yangli9
*/
-public interface ITuple extends IEvaluatableTuple {
+public interface ITuple extends IEvaluatableTuple, Cloneable {
List<String> getAllFields();
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
index 14d717e..54e5786 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
@@ -55,6 +55,11 @@ public class Tuple implements ITuple {
}
@Override
+ public Object clone() {
+ return makeCopy();
+ }
+
+ @Override
public ITuple makeCopy() {
Tuple ret = new Tuple(this.info);
for (int i = 0; i < this.values.length; ++i) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 90a2e43..acb4960 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -104,6 +104,10 @@ public class StorageContext {
return this.enableLimit;
}
+ public int getStoragePushDownLimit() {
+ return this.isLimitEnabled() ? this.getOffset() + this.getLimit() : Integer.MAX_VALUE;
+ }
+
public void markSort() {
this.hasSort = true;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 9ca53f9..83ee6c7 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -69,10 +69,10 @@ public class CubeSegmentScanner implements IGTScanner {
}
scanRequest = scanRangePlanner.planScanRequest();
if (scanRequest != null) {
- scanRequest.setAllowPreAggregation(context.isNeedStorageAggregation());
- scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
- if (context.isLimitEnabled())
- scanRequest.setRowLimit(context.getLimit());
+ scanRequest.setAllowStorageAggregation(context.isNeedStorageAggregation());
+ scanRequest.setAggCacheMemThreshold(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+ scanRequest.setStorageScanRowNumThreshold(context.getThreshold());//TODO: devide by shard number?
+ scanRequest.setStoragePushDownLimit(context.getStoragePushDownLimit());
}
scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 68556d6..0f96e3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -18,10 +18,11 @@
package org.apache.kylin.storage.gtrecord;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
+import java.util.Map.Entry;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.Dictionary;
@@ -36,9 +37,11 @@ import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -62,8 +65,10 @@ public class CubeTupleConverter {
final int nSelectedDims;
+ final int[] dimensionIndexOnTuple;
+
public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
- Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+ Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
this.tupleInfo = returnTupleInfo;
@@ -83,6 +88,20 @@ public class CubeTupleConverter {
advMeasureFillers = Lists.newArrayListWithCapacity(1);
advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
+ // dimensionIndexOnTuple is for SQL with limit
+ List<Integer> temp = Lists.newArrayList();
+ for (TblColRef dim : cuboid.getColumns()) {
+ if (tupleInfo.hasColumn(dim)) {
+ temp.add(tupleInfo.getColumnIndex(dim));
+ }
+ }
+ dimensionIndexOnTuple = new int[temp.size()];
+ for (int i = 0; i < temp.size(); i++) {
+ dimensionIndexOnTuple[i] = temp.get(i);
+ }
+
+ ////////////
+
int iii = 0;
// pre-calculate dimension index mapping to tuple
@@ -90,6 +109,11 @@ public class CubeTupleConverter {
int i = mapping.getIndexOf(dim);
gtColIdx[iii] = i;
tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
+
+ // if (tupleIdx[iii] == -1) {
+ // throw new IllegalStateException("dim not used in tuple:" + dim);
+ // }
+
iii++;
}
@@ -131,6 +155,44 @@ public class CubeTupleConverter {
}
}
+ public Comparator<ITuple> getTupleDimensionComparator() {
+ return new Comparator<ITuple>() {
+ @Override
+ public int compare(ITuple o1, ITuple o2) {
+ Preconditions.checkNotNull(o1);
+ Preconditions.checkNotNull(o2);
+ for (int i = 0; i < dimensionIndexOnTuple.length; i++) {
+ int index = dimensionIndexOnTuple[i];
+
+ if (index == -1) {
+ //TODO:
+ continue;
+ }
+
+ Comparable a = (Comparable) o1.getAllValues()[index];
+ Comparable b = (Comparable) o2.getAllValues()[index];
+
+ if (a == null && b == null) {
+ continue;
+ } else if (a == null) {
+ return 1;
+ } else if (b == null) {
+ return -1;
+ } else {
+ int temp = a.compareTo(b);
+ if (temp != 0) {
+ return temp;
+ } else {
+ continue;
+ }
+ }
+ }
+
+ return 0;
+ }
+ };
+ }
+
// load only needed dictionaries
private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
@@ -201,55 +263,55 @@ public class CubeTupleConverter {
return null;
switch (deriveInfo.type) {
- case LOOKUP:
- return new IDerivedColumnFiller() {
- CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
- LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
- int[] derivedColIdx = initDerivedColIdx();
- Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
-
- private int[] initDerivedColIdx() {
- int[] idx = new int[deriveInfo.columns.length];
- for (int i = 0; i < idx.length; i++) {
- idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+ case LOOKUP:
+ return new IDerivedColumnFiller() {
+ CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
+ LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
+ int[] derivedColIdx = initDerivedColIdx();
+ Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
+
+ private int[] initDerivedColIdx() {
+ int[] idx = new int[deriveInfo.columns.length];
+ for (int i = 0; i < idx.length; i++) {
+ idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+ }
+ return idx;
}
- return idx;
- }
- @Override
- public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
- for (int i = 0; i < hostTmpIdx.length; i++) {
- lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
- }
+ @Override
+ public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+ for (int i = 0; i < hostTmpIdx.length; i++) {
+ lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
+ }
- String[] lookupRow = lookupTable.getRow(lookupKey);
+ String[] lookupRow = lookupTable.getRow(lookupKey);
- if (lookupRow != null) {
- for (int i = 0; i < derivedTupleIdx.length; i++) {
- if (derivedTupleIdx[i] >= 0) {
- String value = lookupRow[derivedColIdx[i]];
- tuple.setDimensionValue(derivedTupleIdx[i], value);
+ if (lookupRow != null) {
+ for (int i = 0; i < derivedTupleIdx.length; i++) {
+ if (derivedTupleIdx[i] >= 0) {
+ String value = lookupRow[derivedColIdx[i]];
+ tuple.setDimensionValue(derivedTupleIdx[i], value);
+ }
}
- }
- } else {
- for (int i = 0; i < derivedTupleIdx.length; i++) {
- if (derivedTupleIdx[i] >= 0) {
- tuple.setDimensionValue(derivedTupleIdx[i], null);
+ } else {
+ for (int i = 0; i < derivedTupleIdx.length; i++) {
+ if (derivedTupleIdx[i] >= 0) {
+ tuple.setDimensionValue(derivedTupleIdx[i], null);
+ }
}
}
}
- }
- };
- case PK_FK:
- return new IDerivedColumnFiller() {
- @Override
- public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
- // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
- tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
- }
- };
- default:
- throw new IllegalArgumentException();
+ };
+ case PK_FK:
+ return new IDerivedColumnFiller() {
+ @Override
+ public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+ // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
+ tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
+ }
+ };
+ default:
+ throw new IllegalArgumentException();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
new file mode 100644
index 0000000..cb83819
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Iterator;
+
+interface FetchSourceAwareIterator<F> extends IFetchSourceAware<F>, Iterator<F> {
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 7acf186..ae5240b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -101,13 +101,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
boolean exactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
context.setExactAggregation(exactAggregation);
- context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation));
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
TupleFilter filterD = translateDerived(filter, groupsD);
+ context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation));
+ enableStoragePushDownLimit(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
- setLimit(filter, context);
List<CubeSegmentScanner> scanners = Lists.newArrayList();
for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
@@ -227,6 +227,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
return !isExactAggregation;
}
+ //exact aggregation was introduced back when we had some measures (like holistic distinct count) that is sensitive
+ //to post aggregation. Now that we don't have such measure any more, isExactAggregation should be useless (at least in v2 storage and above)
public boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
boolean exact = true;
@@ -372,11 +374,44 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
- private void setLimit(TupleFilter filter, StorageContext context) {
- boolean goodAggr = context.isExactAggregation();
+ private void enableStoragePushDownLimit(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
+ boolean possible = true;
+
boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
- boolean goodSort = context.hasSort() == false;
- if (goodAggr && goodFilter && goodSort) {
+ if (!goodFilter) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because the filter is unevaluatable");
+ }
+
+ boolean goodSort = !context.hasSort();
+ if (!goodSort) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because the query has order by");
+ }
+
+ // derived aggregation is bad, unless expanded columns are already in group by
+ if (!groups.containsAll(derivedPostAggregation)) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because derived column require post aggregation: " + derivedPostAggregation);
+ }
+
+ //if groupsD is clustered at "head" of the rowkey, then limit push down is possible
+ int size = groupsD.size();
+ if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because groupD is not clustered at head, groupsD: " + groupsD //
+ + " with cuboid columns: " + cuboid.getColumns());
+ }
+
+ //if exists measures like max(cal_dt), then it's not a perfect cuboid match, cannot apply limit
+ for (FunctionDesc functionDesc : functionDescs) {
+ if (functionDesc.isDimensionAsMetric()) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because {} isDimensionAsMetric ", functionDesc);
+ }
+ }
+
+ if (possible) {
logger.info("Enable limit " + context.getLimit());
context.enableLimit();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
new file mode 100644
index 0000000..d51ca4c
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Iterator;
+
+public interface IFetchSourceAware<E> {
+ public Iterator<? extends E> getFetchSource();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
new file mode 100644
index 0000000..96a232f
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Iterator;
+
+import com.google.common.collect.PeekingIterator;
+
+/**
+ * copied from guava, change iterator access modifier to public
+ *
+ * Implementation of PeekingIterator that avoids peeking unless necessary.
+ */
+class PeekingImpl<E> implements PeekingIterator<E> {
+
+ public final Iterator<? extends E> iterator;
+ private boolean hasPeeked;
+ private E peekedElement;
+
+ public PeekingImpl(Iterator<? extends E> iterator) {
+ this.iterator = checkNotNull(iterator);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return hasPeeked || iterator.hasNext();
+ }
+
+ @Override
+ public E next() {
+ if (!hasPeeked) {
+ return iterator.next();
+ }
+ E result = peekedElement;
+ hasPeeked = false;
+ peekedElement = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ checkState(!hasPeeked, "Can't remove after you've peeked at next");
+ iterator.remove();
+ }
+
+ @Override
+ public E peek() {
+ if (!hasPeeked) {
+ peekedElement = iterator.next();
+ hasPeeked = true;
+ }
+ return peekedElement;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
new file mode 100644
index 0000000..61267ae
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.StorageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SegmentCubeTupleIterator implements ITupleIterator {
+
+ private static final Logger logger = LoggerFactory.getLogger(SegmentCubeTupleIterator.class);
+
+ protected final CubeSegmentScanner scanner;
+ protected final Cuboid cuboid;
+ protected final Set<TblColRef> selectedDimensions;
+ protected final Set<FunctionDesc> selectedMetrics;
+ protected final TupleInfo tupleInfo;
+ protected final Tuple tuple;
+ protected final StorageContext context;
+
+ protected Iterator<GTRecord> gtItr;
+ protected CubeTupleConverter cubeTupleConverter;
+ protected Tuple next;
+
+ private List<IAdvMeasureFiller> advMeasureFillers;
+ private int advMeasureRowsRemaining;
+ private int advMeasureRowIndex;
+
+ public SegmentCubeTupleIterator(CubeSegmentScanner scanner, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+ Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
+ this.scanner = scanner;
+ this.cuboid = cuboid;
+ this.selectedDimensions = selectedDimensions;
+ this.selectedMetrics = selectedMetrics;
+ this.tupleInfo = returnTupleInfo;
+ this.tuple = new Tuple(returnTupleInfo);
+ this.context = context;
+ this.gtItr = getGTItr(scanner);
+ this.cubeTupleConverter = new CubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+ }
+
+ private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) {
+ return scanner.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ // consume any left rows from advanced measure filler
+ if (advMeasureRowsRemaining > 0) {
+ for (IAdvMeasureFiller filler : advMeasureFillers) {
+ filler.fillTuple(tuple, advMeasureRowIndex);
+ }
+ advMeasureRowIndex++;
+ advMeasureRowsRemaining--;
+ next = tuple;
+ return true;
+ }
+
+ // now we have a GTRecord
+ if (!gtItr.hasNext()) {
+ return false;
+ }
+ GTRecord curRecord = gtItr.next();
+
+ Preconditions.checkNotNull(cubeTupleConverter);
+
+ // translate into tuple
+ advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple);
+
+ // the simple case
+ if (advMeasureFillers == null) {
+ next = tuple;
+ return true;
+ }
+
+ // advanced measure filling, like TopN, will produce multiple tuples out of one record
+ advMeasureRowsRemaining = -1;
+ for (IAdvMeasureFiller filler : advMeasureFillers) {
+ if (advMeasureRowsRemaining < 0)
+ advMeasureRowsRemaining = filler.getNumOfRows();
+ if (advMeasureRowsRemaining != filler.getNumOfRows())
+ throw new IllegalStateException();
+ }
+ if (advMeasureRowsRemaining < 0)
+ throw new IllegalStateException();
+
+ advMeasureRowIndex = 0;
+ return hasNext();
+ }
+
+ @Override
+ public ITuple next() {
+ // fetch next record
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+
+ ITuple result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ close(scanner);
+ }
+
+ protected void close(CubeSegmentScanner scanner) {
+ try {
+ scanner.close();
+ } catch (IOException e) {
+ logger.error("Exception when close CubeScanner", e);
+ }
+ }
+
+ public CubeTupleConverter getCubeTupleConverter() {
+ return cubeTupleConverter;
+ }
+}