You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/12 11:48:49 UTC
[17/48] kylin git commit: KYLIN-2483 SortedIteratorMergerWithLimit
could be slower when number of total merge rows is small
KYLIN-2483 SortedIteratorMergerWithLimit could be slower when number of total merge rows is small
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3fc68437
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3fc68437
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3fc68437
Branch: refs/heads/master-hbase0.98
Commit: 3fc6843736f877712cd24ed8cafba4554b5e4d31
Parents: dd7507a
Author: Hongbin Ma <ma...@apache.org>
Authored: Sun Mar 5 22:44:14 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Sun Mar 5 22:44:20 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 5 ++-
.../apache/kylin/storage/StorageContext.java | 45 +++++++++++++++-----
.../gtrecord/SequentialCubeTupleIterator.java | 21 +++------
.../gtrecord/StorageResponseGTScatter.java | 7 +--
.../org/apache/kylin/query/KylinTestBase.java | 2 +-
5 files changed, 50 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index df3e914..3a87d3d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -773,7 +773,6 @@ abstract public class KylinConfigBase implements Serializable {
// ENGINE.SPARK
// ============================================================================
-
public String getHadoopConfDir() {
return getOptional("kylin.env.hadoop-conf-dir", "");
}
@@ -817,6 +816,10 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000"));
}
+ public int getMergeSortPartitionResultsMinLimit() {
+ return Integer.parseInt(getOptional("kylin.query.merge-sort-partition-results.min-limit", "100"));
+ }
+
public long getQueryMaxScanBytes() {
long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0"));
return value > 0 ? value : Long.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index f58fb34..f4211ff 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -20,9 +20,11 @@ package org.apache.kylin.storage;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +47,6 @@ public class StorageContext {
private boolean exactAggregation = false;
private boolean needStorageAggregation = false;
- private boolean limitEnabled = false;
private boolean enableCoprocessor = false;
private IStorageQuery storageQuery;
@@ -63,7 +64,9 @@ public class StorageContext {
this.connUrl = connUrl;
}
- public int getLimit() {
+ //the limit here correspond to the limit concept in SQL
+ //also take into consideration Statement.setMaxRows in JDBC
+ private int getLimit() {
if (overlookOuterLimit || BackdoorToggles.getStatementMaxRows() == null || BackdoorToggles.getStatementMaxRows() == 0) {
return limit;
} else {
@@ -71,10 +74,6 @@ public class StorageContext {
}
}
- public void setOverlookOuterLimit() {
- this.overlookOuterLimit = true;
- }
-
public void setLimit(int l) {
if (limit != Integer.MAX_VALUE) {
logger.warn("Setting limit to {} but in current olap context, the limit is already {}, won't apply", l, limit);
@@ -83,6 +82,12 @@ public class StorageContext {
}
}
+ //outer limit is sth like Statement.setMaxRows in JDBC
+ public void setOverlookOuterLimit() {
+ this.overlookOuterLimit = true;
+ }
+
+ //the offset here correspond to the offset concept in SQL
public int getOffset() {
return offset;
}
@@ -91,8 +96,18 @@ public class StorageContext {
this.offset = offset;
}
- public boolean isLimitEnabled() {
- return this.limitEnabled;
+ /**
+ * in contrast to the limit in SQL concept, "limit push down" means
+ * whether the limit is effective in storage level. Some queries are not possible
+ * to leverage limit clause, checkout
+ * {@link GTCubeStorageQueryBase#enableStorageLimitIfPossible(org.apache.kylin.cube.cuboid.Cuboid, java.util.Collection, java.util.Set, java.util.Collection, org.apache.kylin.metadata.filter.TupleFilter, java.util.Set, java.util.Collection, org.apache.kylin.storage.StorageContext)}
+ */
+ public boolean isLimitPushDownEnabled() {
+ return isValidPushDownLimit(finalPushDownLimit);
+ }
+
+ public static boolean isValidPushDownLimit(int finalPushDownLimit) {
+ return finalPushDownLimit < Integer.MAX_VALUE && finalPushDownLimit > 0;
}
public int getFinalPushDownLimit() {
@@ -101,7 +116,7 @@ public class StorageContext {
public void setFinalPushDownLimit(IRealization realization) {
- if (this.getLimit() == Integer.MAX_VALUE) {
+ if (!isValidPushDownLimit(this.getLimit())) {
return;
}
@@ -110,12 +125,20 @@ public class StorageContext {
if (!realization.supportsLimitPushDown()) {
logger.warn("Not enabling limit push down because cube storage type not supported");
} else {
- this.limitEnabled = true;
this.finalPushDownLimit = tempPushDownLimit;
- logger.info("Enable limit: " + tempPushDownLimit);
+ logger.info("Enable limit (storage push down limit) :" + tempPushDownLimit);
}
}
+ public boolean mergeSortPartitionResults() {
+ return mergeSortPartitionResults(finalPushDownLimit);
+ }
+
+ public static boolean mergeSortPartitionResults(int finalPushDownLimit) {
+ return isValidPushDownLimit(finalPushDownLimit) && //
+ (finalPushDownLimit > KylinConfig.getInstanceFromEnv().getMergeSortPartitionResultsMinLimit());
+ }
+
public long getDeadline() {
return this.deadline;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 14b6394..9d5d816 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -24,8 +24,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import javax.annotation.Nullable;
-
import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -37,7 +35,6 @@ import org.apache.kylin.storage.StorageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -64,19 +61,15 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
}
- if (!context.isLimitEnabled()) {
- //normal case
- tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
- } else {
+ if (context.mergeSortPartitionResults()) {
//query with limit
- Iterator<Iterator<ITuple>> transformed = Iterators.transform(segmentCubeTupleIterators.iterator(), new Function<SegmentCubeTupleIterator, Iterator<ITuple>>() {
- @Nullable
- @Override
- public Iterator<ITuple> apply(@Nullable SegmentCubeTupleIterator input) {
- return input;
- }
- });
+ logger.info("Using SortedIteratorMergerWithLimit to merge segment results");
+ Iterator<Iterator<ITuple>> transformed = (Iterator<Iterator<ITuple>>) (Iterator<?>) segmentCubeTupleIterators.iterator();
tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, context.getFinalPushDownLimit(), getTupleDimensionComparator(cuboid, returnTupleInfo)).getIterator();
+ } else {
+ //normal case
+ logger.info("Using Iterators.concat to merge segment results");
+ tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 3904b5c..1a80bbf 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.StorageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,11 +70,11 @@ public class StorageResponseGTScatter implements IGTScanner {
@Override
public Iterator<GTRecord> iterator() {
Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc());
- if (storagePushDownLimit != Integer.MAX_VALUE) {
- logger.info("Using SortedIteratorMergerWithLimit to merge partitions");
+ if (StorageContext.mergeSortPartitionResults(storagePushDownLimit)) {
+ logger.info("Using SortedIteratorMergerWithLimit to merge partition results");
return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
} else {
- logger.info("Using Iterators.concat to merge partitions");
+ logger.info("Using Iterators.concat to merge partition results");
return Iterators.concat(shardSubsets);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index d0524c6..2174094 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -682,7 +682,7 @@ public class KylinTestBase {
protected boolean checkFinalPushDownLimit() {
OLAPContext context = getFirstOLAPContext();
- return context.storageContext.isLimitEnabled();
+ return context.storageContext.isLimitPushDownEnabled();
}