You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/30 12:07:33 UTC
[2/5] kylin git commit: optimize SortedIteratorMergerWithLimit when
only 1 shard
optimize SortedIteratorMergerWithLimit when only 1 shard
temp
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/85fcf019
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/85fcf019
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/85fcf019
Branch: refs/heads/yang22-cdh5.7
Commit: 85fcf019205c3e5e84d1224d1f19e166d3641b0e
Parents: 5c70af2
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Dec 30 17:13:14 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Dec 30 18:09:51 2016 +0800
----------------------------------------------------------------------
.../storage/gtrecord/CubeSegmentScanner.java | 6 ++++--
.../gtrecord/SequentialCubeTupleIterator.java | 10 +++++++---
.../storage/gtrecord/SortedIteratorMerger.java | 19 ++++++++++++++++++-
3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/85fcf019/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 9d6f946..6fd88b7 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -50,12 +50,13 @@ public class CubeSegmentScanner implements IGTScanner {
final Cuboid cuboid;
final GTScanRequest scanRequest;
+ final boolean isEmpty;
public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
Collection<FunctionDesc> metrics, TupleFilter originalfilter, StorageContext context) {
-
+
logger.info("Init CubeSegmentScanner for segment {}", cubeSeg.getName());
-
+
this.cuboid = cuboid;
this.cubeSeg = cubeSeg;
@@ -80,6 +81,7 @@ public class CubeSegmentScanner implements IGTScanner {
scanRequest = scanRangePlanner.planScanRequest();
String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
+ isEmpty = scanRequest == null;
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/85fcf019/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index ee868c7..49080d6 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -60,9 +60,13 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
segmentCubeTupleIterators = Lists.newArrayList();
for (CubeSegmentScanner scanner : scanners) {
- segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
+ if (!scanner.isEmpty) {
+ segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
+ }
}
+ logger.info("Number of segmentCubeTupleIterators: {}", segmentCubeTupleIterators.size());
+
if (!context.isLimitEnabled()) {
//normal case
tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
@@ -78,7 +82,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, context.getFinalPushDownLimit(), getTupleDimensionComparator(cuboid, returnTupleInfo)).getIterator();
}
}
-
+
public Comparator<ITuple> getTupleDimensionComparator(Cuboid cuboid, TupleInfo returnTupleInfo) {
// dimensionIndexOnTuple is for SQL with limit
List<Integer> temp = Lists.newArrayList();
@@ -92,7 +96,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
for (int i = 0; i < temp.size(); i++) {
dimensionIndexOnTuple[i] = temp.get(i);
}
-
+
return new Comparator<ITuple>() {
@Override
public int compare(ITuple o1, ITuple o2) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/85fcf019/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
index d5aa9d0..bca8068 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
@@ -22,6 +22,9 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Preconditions;
/**
@@ -29,9 +32,14 @@ import com.google.common.base.Preconditions;
*/
public class SortedIteratorMerger<E> {
+ private static final Logger logger = LoggerFactory.getLogger(SortedIteratorMerger.class);
+
private Iterator<Iterator<E>> shardSubsets;
private Comparator<E> comparator;
+ private int shardCount = 0;
+ private Iterator<E> firstItr;
+
public SortedIteratorMerger(Iterator<Iterator<E>> shardSubsets, Comparator<E> comparator) {
this.shardSubsets = shardSubsets;
this.comparator = comparator;
@@ -47,12 +55,22 @@ public class SortedIteratorMerger<E> {
while (shardSubsets.hasNext()) {
Iterator<E> iterator = shardSubsets.next();
+ if (shardCount++ == 0) {
+ firstItr = iterator;
+ }
+
PeekingImpl<E> peekingIterator = new PeekingImpl<>(iterator);
+
if (peekingIterator.hasNext()) {
heap.offer(peekingIterator);
}
}
+ if (shardCount == 1) {
+ logger.info("SortedIteratorMerger will downgrade to a single normal iterator");
+ return firstItr;
+ }
+
return getIteratorInternal(heap);
}
@@ -94,7 +112,6 @@ public class SortedIteratorMerger<E> {
throw new UnsupportedOperationException();
}
-
}
}