You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/30 12:07:33 UTC

[2/5] kylin git commit: optimize SortedIteratorMergerWithLimit when only 1 shard

optimize SortedIteratorMergerWithLimit when only 1 shard

temp


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/85fcf019
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/85fcf019
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/85fcf019

Branch: refs/heads/yang22-cdh5.7
Commit: 85fcf019205c3e5e84d1224d1f19e166d3641b0e
Parents: 5c70af2
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Dec 30 17:13:14 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Dec 30 18:09:51 2016 +0800

----------------------------------------------------------------------
 .../storage/gtrecord/CubeSegmentScanner.java     |  6 ++++--
 .../gtrecord/SequentialCubeTupleIterator.java    | 10 +++++++---
 .../storage/gtrecord/SortedIteratorMerger.java   | 19 ++++++++++++++++++-
 3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/85fcf019/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 9d6f946..6fd88b7 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -50,12 +50,13 @@ public class CubeSegmentScanner implements IGTScanner {
     final Cuboid cuboid;
 
     final GTScanRequest scanRequest;
+    final boolean isEmpty;
 
     public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
             Collection<FunctionDesc> metrics, TupleFilter originalfilter, StorageContext context) {
-        
+
         logger.info("Init CubeSegmentScanner for segment {}", cubeSeg.getName());
-        
+
         this.cuboid = cuboid;
         this.cubeSeg = cubeSeg;
 
@@ -80,6 +81,7 @@ public class CubeSegmentScanner implements IGTScanner {
         scanRequest = scanRangePlanner.planScanRequest();
         String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
         scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
+        isEmpty = scanRequest == null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/85fcf019/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index ee868c7..49080d6 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -60,9 +60,13 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
 
         segmentCubeTupleIterators = Lists.newArrayList();
         for (CubeSegmentScanner scanner : scanners) {
-            segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
+            if (!scanner.isEmpty) {
+                segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
+            }
         }
 
+        logger.info("Number of segmentCubeTupleIterators: {}", segmentCubeTupleIterators.size());
+
         if (!context.isLimitEnabled()) {
             //normal case
             tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
@@ -78,7 +82,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
             tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, context.getFinalPushDownLimit(), getTupleDimensionComparator(cuboid, returnTupleInfo)).getIterator();
         }
     }
-    
+
     public Comparator<ITuple> getTupleDimensionComparator(Cuboid cuboid, TupleInfo returnTupleInfo) {
         // dimensionIndexOnTuple is for SQL with limit
         List<Integer> temp = Lists.newArrayList();
@@ -92,7 +96,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
         for (int i = 0; i < temp.size(); i++) {
             dimensionIndexOnTuple[i] = temp.get(i);
         }
-        
+
         return new Comparator<ITuple>() {
             @Override
             public int compare(ITuple o1, ITuple o2) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/85fcf019/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
index d5aa9d0..bca8068 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
@@ -22,6 +22,9 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.PriorityQueue;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -29,9 +32,14 @@ import com.google.common.base.Preconditions;
  */
 public class SortedIteratorMerger<E> {
 
+    private static final Logger logger = LoggerFactory.getLogger(SortedIteratorMerger.class);
+
     private Iterator<Iterator<E>> shardSubsets;
     private Comparator<E> comparator;
 
+    private int shardCount = 0;
+    private Iterator<E> firstItr;
+
     public SortedIteratorMerger(Iterator<Iterator<E>> shardSubsets, Comparator<E> comparator) {
         this.shardSubsets = shardSubsets;
         this.comparator = comparator;
@@ -47,12 +55,22 @@ public class SortedIteratorMerger<E> {
 
         while (shardSubsets.hasNext()) {
             Iterator<E> iterator = shardSubsets.next();
+            if (shardCount++ == 0) {
+                firstItr = iterator;
+            }
+
             PeekingImpl<E> peekingIterator = new PeekingImpl<>(iterator);
+
             if (peekingIterator.hasNext()) {
                 heap.offer(peekingIterator);
             }
         }
 
+        if (shardCount == 1) {
+            logger.info("SortedIteratorMerger will downgrade to a single normal iterator");
+            return firstItr;
+        }
+
         return getIteratorInternal(heap);
     }
 
@@ -94,7 +112,6 @@ public class SortedIteratorMerger<E> {
             throw new UnsupportedOperationException();
         }
 
-     
     }
 
 }