You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/12 11:48:49 UTC

[17/48] kylin git commit: KYLIN-2483 SortedIteratorMergerWithLimit could be slower when number of total merge rows is small

KYLIN-2483 SortedIteratorMergerWithLimit could be slower when number of total merge rows is small


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3fc68437
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3fc68437
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3fc68437

Branch: refs/heads/master-hbase0.98
Commit: 3fc6843736f877712cd24ed8cafba4554b5e4d31
Parents: dd7507a
Author: Hongbin Ma <ma...@apache.org>
Authored: Sun Mar 5 22:44:14 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Sun Mar 5 22:44:20 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  5 ++-
 .../apache/kylin/storage/StorageContext.java    | 45 +++++++++++++++-----
 .../gtrecord/SequentialCubeTupleIterator.java   | 21 +++------
 .../gtrecord/StorageResponseGTScatter.java      |  7 +--
 .../org/apache/kylin/query/KylinTestBase.java   |  2 +-
 5 files changed, 50 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index df3e914..3a87d3d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -773,7 +773,6 @@ abstract public class KylinConfigBase implements Serializable {
     // ENGINE.SPARK
     // ============================================================================
 
-
     public String getHadoopConfDir() {
         return getOptional("kylin.env.hadoop-conf-dir", "");
     }
@@ -817,6 +816,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000"));
     }
 
+    public int getMergeSortPartitionResultsMinLimit() {
+        return Integer.parseInt(getOptional("kylin.query.merge-sort-partition-results.min-limit", "100"));
+    }
+
     public long getQueryMaxScanBytes() {
         long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0"));
         return value > 0 ? value : Long.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index f58fb34..f4211ff 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -20,9 +20,11 @@ package org.apache.kylin.storage;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +47,6 @@ public class StorageContext {
 
     private boolean exactAggregation = false;
     private boolean needStorageAggregation = false;
-    private boolean limitEnabled = false;
     private boolean enableCoprocessor = false;
 
     private IStorageQuery storageQuery;
@@ -63,7 +64,9 @@ public class StorageContext {
         this.connUrl = connUrl;
     }
 
-    public int getLimit() {
+    //the limit here correspond to the limit concept in SQL
+    //also take into consideration Statement.setMaxRows in JDBC
+    private int getLimit() {
         if (overlookOuterLimit || BackdoorToggles.getStatementMaxRows() == null || BackdoorToggles.getStatementMaxRows() == 0) {
             return limit;
         } else {
@@ -71,10 +74,6 @@ public class StorageContext {
         }
     }
 
-    public void setOverlookOuterLimit() {
-        this.overlookOuterLimit = true;
-    }
-
     public void setLimit(int l) {
         if (limit != Integer.MAX_VALUE) {
             logger.warn("Setting limit to {} but in current olap context, the limit is already {}, won't apply", l, limit);
@@ -83,6 +82,12 @@ public class StorageContext {
         }
     }
 
+    //outer limit is sth like Statement.setMaxRows in JDBC
+    public void setOverlookOuterLimit() {
+        this.overlookOuterLimit = true;
+    }
+
+    //the offset here correspond to the offset concept in SQL
     public int getOffset() {
         return offset;
     }
@@ -91,8 +96,18 @@ public class StorageContext {
         this.offset = offset;
     }
 
-    public boolean isLimitEnabled() {
-        return this.limitEnabled;
+    /**
+     * in contrast to the limit in SQL concept, "limit push down" means
+     * whether the limit is effective in storage level. Some queries are not possible 
+     * to leverage limit clause, checkout 
+     * {@link GTCubeStorageQueryBase#enableStorageLimitIfPossible(org.apache.kylin.cube.cuboid.Cuboid, java.util.Collection, java.util.Set, java.util.Collection, org.apache.kylin.metadata.filter.TupleFilter, java.util.Set, java.util.Collection, org.apache.kylin.storage.StorageContext)}
+     */
+    public boolean isLimitPushDownEnabled() {
+        return isValidPushDownLimit(finalPushDownLimit);
+    }
+
+    public static boolean isValidPushDownLimit(int finalPushDownLimit) {
+        return finalPushDownLimit < Integer.MAX_VALUE && finalPushDownLimit > 0;
     }
 
     public int getFinalPushDownLimit() {
@@ -101,7 +116,7 @@ public class StorageContext {
 
     public void setFinalPushDownLimit(IRealization realization) {
 
-        if (this.getLimit() == Integer.MAX_VALUE) {
+        if (!isValidPushDownLimit(this.getLimit())) {
             return;
         }
 
@@ -110,12 +125,20 @@ public class StorageContext {
         if (!realization.supportsLimitPushDown()) {
             logger.warn("Not enabling limit push down because cube storage type not supported");
         } else {
-            this.limitEnabled = true;
             this.finalPushDownLimit = tempPushDownLimit;
-            logger.info("Enable limit: " + tempPushDownLimit);
+            logger.info("Enable limit (storage push down limit) :" + tempPushDownLimit);
         }
     }
 
+    public boolean mergeSortPartitionResults() {
+        return mergeSortPartitionResults(finalPushDownLimit);
+    }
+
+    public static boolean mergeSortPartitionResults(int finalPushDownLimit) {
+        return isValidPushDownLimit(finalPushDownLimit) && //
+                (finalPushDownLimit > KylinConfig.getInstanceFromEnv().getMergeSortPartitionResultsMinLimit());
+    }
+
     public long getDeadline() {
         return this.deadline;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 14b6394..9d5d816 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -24,8 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import javax.annotation.Nullable;
-
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -37,7 +35,6 @@ import org.apache.kylin.storage.StorageContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -64,19 +61,15 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
             segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
         }
 
-        if (!context.isLimitEnabled()) {
-            //normal case
-            tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
-        } else {
+        if (context.mergeSortPartitionResults()) {
             //query with limit
-            Iterator<Iterator<ITuple>> transformed = Iterators.transform(segmentCubeTupleIterators.iterator(), new Function<SegmentCubeTupleIterator, Iterator<ITuple>>() {
-                @Nullable
-                @Override
-                public Iterator<ITuple> apply(@Nullable SegmentCubeTupleIterator input) {
-                    return input;
-                }
-            });
+            logger.info("Using SortedIteratorMergerWithLimit to merge segment results");
+            Iterator<Iterator<ITuple>> transformed = (Iterator<Iterator<ITuple>>) (Iterator<?>) segmentCubeTupleIterators.iterator();
             tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, context.getFinalPushDownLimit(), getTupleDimensionComparator(cuboid, returnTupleInfo)).getIterator();
+        } else {
+            //normal case
+            logger.info("Using Iterators.concat to merge segment results");
+            tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 3904b5c..1a80bbf 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.StorageContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,11 +70,11 @@ public class StorageResponseGTScatter implements IGTScanner {
     @Override
     public Iterator<GTRecord> iterator() {
         Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc());
-        if (storagePushDownLimit != Integer.MAX_VALUE) {
-            logger.info("Using SortedIteratorMergerWithLimit to merge partitions");
+        if (StorageContext.mergeSortPartitionResults(storagePushDownLimit)) {
+            logger.info("Using SortedIteratorMergerWithLimit to merge partition results");
             return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
         } else {
-            logger.info("Using Iterators.concat to merge partitions");
+            logger.info("Using Iterators.concat to merge partition results");
             return Iterators.concat(shardSubsets);
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc68437/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index d0524c6..2174094 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -682,7 +682,7 @@ public class KylinTestBase {
 
     protected boolean checkFinalPushDownLimit() {
         OLAPContext context = getFirstOLAPContext();
-        return context.storageContext.isLimitEnabled();
+        return context.storageContext.isLimitPushDownEnabled();
 
     }