You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/08/10 04:56:56 UTC
[1/2] kylin git commit: KYLIN-1936 improve enable limit logic
(exactAggregation is too strict)
Repository: kylin
Updated Branches:
refs/heads/master c67891d26 -> 28e942306
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 3c992d2..ff7fb2b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -21,153 +21,77 @@ package org.apache.kylin.storage.gtrecord;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.Set;
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.exception.ScanOutOfLimitException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
public class SequentialCubeTupleIterator implements ITupleIterator {
private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
- protected final Cuboid cuboid;
- protected final Set<TblColRef> selectedDimensions;
- protected final Set<FunctionDesc> selectedMetrics;
- protected final TupleInfo tupleInfo;
- protected final Tuple tuple;
- protected final Iterator<CubeSegmentScanner> scannerIterator;
- protected final StorageContext context;
-
- protected CubeSegmentScanner curScanner;
- protected Iterator<GTRecord> curRecordIterator;
- protected CubeTupleConverter curTupleConverter;
- protected Tuple next;
-
- private List<IAdvMeasureFiller> advMeasureFillers;
- private int advMeasureRowsRemaining;
- private int advMeasureRowIndex;
+ protected List<CubeSegmentScanner> scanners;
+ protected List<SegmentCubeTupleIterator> segmentCubeTupleIterators;
+ protected Iterator<ITuple> tupleIterator;
+ protected final int storagePushDownLimit;
+ protected StorageContext context;
private int scanCount;
private int scanCountDelta;
public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
- this.cuboid = cuboid;
- this.selectedDimensions = selectedDimensions;
- this.selectedMetrics = selectedMetrics;
- this.tupleInfo = returnTupleInfo;
- this.tuple = new Tuple(returnTupleInfo);
- this.scannerIterator = scanners.iterator();
this.context = context;
- }
+ this.scanners = scanners;
- @Override
- public boolean hasNext() {
- if (next != null)
- return true;
-
- if (hitLimitAndThreshold())
- return false;
-
- // consume any left rows from advanced measure filler
- if (advMeasureRowsRemaining > 0) {
- for (IAdvMeasureFiller filler : advMeasureFillers) {
- filler.fillTuple(tuple, advMeasureRowIndex);
- }
- advMeasureRowIndex++;
- advMeasureRowsRemaining--;
- next = tuple;
- return true;
+ segmentCubeTupleIterators = Lists.newArrayList();
+ for (CubeSegmentScanner scanner : scanners) {
+ segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
}
-
- // get the next GTRecord
- if (curScanner == null) {
- if (scannerIterator.hasNext()) {
- curScanner = scannerIterator.next();
- curRecordIterator = curScanner.iterator();
- if (curRecordIterator.hasNext()) {
- //if the segment does not has any tuples, don't bother to create a converter
- curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+
+ this.storagePushDownLimit = context.getStoragePushDownLimit();
+ if (storagePushDownLimit > KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
+ //normal case
+ tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
+ } else {
+ //query with limit
+ Iterator<Iterator<ITuple>> transformed = Iterators.transform(segmentCubeTupleIterators.iterator(), new Function<SegmentCubeTupleIterator, Iterator<ITuple>>() {
+ @Nullable
+ @Override
+ public Iterator<ITuple> apply(@Nullable SegmentCubeTupleIterator input) {
+ return input;
}
- } else {
- return false;
- }
- }
- if (curRecordIterator.hasNext() == false) {
- close(curScanner);
- curScanner = null;
- curRecordIterator = null;
- curTupleConverter = null;
- return hasNext();
+ });
+ tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, storagePushDownLimit, segmentCubeTupleIterators.get(0).getCubeTupleConverter().getTupleDimensionComparator()).getIterator();
}
-
- // now we have a GTRecord
- GTRecord curRecord = curRecordIterator.next();
-
- // translate into tuple
- advMeasureFillers = curTupleConverter.translateResult(curRecord, tuple);
-
- // the simple case
- if (advMeasureFillers == null) {
- next = tuple;
- return true;
- }
-
- // advanced measure filling, like TopN, will produce multiple tuples out of one record
- advMeasureRowsRemaining = -1;
- for (IAdvMeasureFiller filler : advMeasureFillers) {
- if (advMeasureRowsRemaining < 0)
- advMeasureRowsRemaining = filler.getNumOfRows();
- if (advMeasureRowsRemaining != filler.getNumOfRows())
- throw new IllegalStateException();
- }
- if (advMeasureRowsRemaining < 0)
- throw new IllegalStateException();
-
- advMeasureRowIndex = 0;
- return hasNext();
}
- private boolean hitLimitAndThreshold() {
- // check limit
- if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) {
- return true;
- }
- // check threshold
- if (scanCount >= context.getThreshold()) {
- throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
- }
- return false;
+ @Override
+ public boolean hasNext() {
+ return tupleIterator.hasNext();
}
@Override
public ITuple next() {
- // fetch next record
- if (next == null) {
- hasNext();
- if (next == null)
- throw new NoSuchElementException();
- }
-
scanCount++;
if (++scanCountDelta >= 1000)
flushScanCountDelta();
- ITuple result = next;
- next = null;
- return result;
+ return tupleIterator.next();
}
@Override
@@ -181,11 +105,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
// close all the remaining segmentIterator
flushScanCountDelta();
- if (curScanner != null)
- close(curScanner);
-
- while (scannerIterator.hasNext()) {
- close(scannerIterator.next());
+ for (SegmentCubeTupleIterator iterator : segmentCubeTupleIterators) {
+ iterator.close();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
new file mode 100644
index 0000000..d5aa9d0
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * a merger that utilizes the sorted nature of input iterators
+ */
+public class SortedIteratorMerger<E> {
+
+ private Iterator<Iterator<E>> shardSubsets;
+ private Comparator<E> comparator;
+
+ public SortedIteratorMerger(Iterator<Iterator<E>> shardSubsets, Comparator<E> comparator) {
+ this.shardSubsets = shardSubsets;
+ this.comparator = comparator;
+ }
+
+ public Iterator<E> getIterator() {
+ final PriorityQueue<PeekingImpl<E>> heap = new PriorityQueue<PeekingImpl<E>>(11, new Comparator<PeekingImpl<E>>() {
+ @Override
+ public int compare(PeekingImpl<E> o1, PeekingImpl<E> o2) {
+ return comparator.compare(o1.peek(), o2.peek());
+ }
+ });
+
+ while (shardSubsets.hasNext()) {
+ Iterator<E> iterator = shardSubsets.next();
+ PeekingImpl<E> peekingIterator = new PeekingImpl<>(iterator);
+ if (peekingIterator.hasNext()) {
+ heap.offer(peekingIterator);
+ }
+ }
+
+ return getIteratorInternal(heap);
+ }
+
+ protected Iterator<E> getIteratorInternal(PriorityQueue<PeekingImpl<E>> heap) {
+ return new MergedIterator<E>(heap, comparator);
+ }
+
+ private static class MergedIterator<E> implements Iterator<E> {
+
+ private final PriorityQueue<PeekingImpl<E>> heap;
+ private final Comparator<E> comparator;
+
+ public MergedIterator(PriorityQueue<PeekingImpl<E>> heap, Comparator<E> comparator) {
+ this.heap = heap;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !heap.isEmpty();
+ }
+
+ @Override
+ public E next() {
+ PeekingImpl<E> poll = heap.poll();
+ E current = poll.next();
+ if (poll.hasNext()) {
+
+ //TODO: remove this check when validated
+ Preconditions.checkState(comparator.compare(current, poll.peek()) < 0, "Not sorted! current: " + current + " Next: " + poll.peek());
+
+ heap.offer(poll);
+ }
+ return current;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
new file mode 100644
index 0000000..0e40150
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * the limit here correspond to the the limit in sql
+ * if the SQL ends with "limit N", then each shard will return N "smallest" records
+ * The query sever side will use a heap to pick right records.
+ *
+ * There're two usage of SortedIteratorMergerWithLimit in kylin
+ * One at GTRecord level and the other at Tuple Level
+ * The first is to deal with cuboid shards among the same segment
+ * and the second is to deal with multiple segments
+ *
+ * Let's use single-segment as an example:
+ * suppose we have a "limit 2" in SQL, and we have three shards in the segment
+ * the first returns (1,2), the second returns (1,3) and the third returns (2,3)
+ * each subset is guaranteed to be sorted. (that's why it's called "SortedIterator Merger")
+ * SortedIteratorMergerWithLimit will merge these three subsets and return (1,1,2,2)
+ *
+ */
+public class SortedIteratorMergerWithLimit<E extends Cloneable> extends SortedIteratorMerger<E> {
+ private int limit;
+ private Comparator<E> comparator;
+
+ public SortedIteratorMergerWithLimit(Iterator<Iterator<E>> shardSubsets, int limit, Comparator<E> comparator) {
+ super(shardSubsets, comparator);
+ this.limit = limit;
+ this.comparator = comparator;
+ }
+
+ protected Iterator<E> getIteratorInternal(PriorityQueue<PeekingImpl<E>> heap) {
+ return new MergedIteratorWithLimit<E>(heap, limit, comparator);
+ }
+
+ static class MergedIteratorWithLimit<E extends Cloneable> implements Iterator<E> {
+
+ private final PriorityQueue<PeekingImpl<E>> heap;
+ private final Comparator<E> comparator;
+
+ private boolean nextFetched = false;
+ private E fetched = null;
+ private E last = null;
+
+ private int limit;
+ private int limitProgress = 0;
+
+ private PeekingImpl<E> lastSource = null;
+
+ public MergedIteratorWithLimit(PriorityQueue<PeekingImpl<E>> heap, int limit, Comparator<E> comparator) {
+ this.heap = heap;
+ this.limit = limit;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextFetched) {
+ return true;
+ }
+
+ if (lastSource != null && lastSource.hasNext()) {
+ if (lastSource.hasNext()) {
+ heap.offer(lastSource);
+ } else {
+ lastSource = null;
+ }
+ }
+
+ if (!heap.isEmpty()) {
+ PeekingImpl<E> first = heap.poll();
+ E current = first.next();
+ try {
+ current = (E) current.getClass().getMethod("clone").invoke(current);
+ } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+
+ lastSource = first;
+
+ Preconditions.checkState(current != null);
+
+ if (last == null || comparator.compare(current, last) != 0) {
+ if (++limitProgress > limit) {
+ return false;
+ }
+ }
+ nextFetched = true;
+ fetched = current;
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public E next() {
+ if (!nextFetched) {
+ throw new IllegalStateException("Should hasNext() before next()");
+ }
+
+ //TODO: remove this check when validated
+ if (last != null) {
+ Preconditions.checkState(comparator.compare(last, fetched) <= 0, "Not sorted! last: " + last + " fetched: " + fetched);
+ }
+
+ last = fetched;
+ nextFetched = false;
+
+ return fetched;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
new file mode 100644
index 0000000..f09844a
--- /dev/null
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SortedIteratorMergerTest {
+
+ private Comparator<Integer> getComp() {
+ return new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return o1 - o2;
+ }
+ };
+ }
+
+ @Test
+ public void basic1() {
+
+ List<Integer> a = Lists.newArrayList(1, 2, 3);
+ List<Integer> b = Lists.newArrayList(1, 2, 3);
+ List<Integer> c = Lists.newArrayList(1, 2, 5);
+ List<Iterator<Integer>> input = Lists.newArrayList();
+ input.add(a.iterator());
+ input.add(b.iterator());
+ input.add(c.iterator());
+ SortedIteratorMerger<Integer> merger = new SortedIteratorMerger<Integer>(input.iterator(), getComp());
+ Iterator<Integer> iterator = merger.getIterator();
+ List<Integer> result = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ Assert.assertEquals(Lists.newArrayList(1, 1, 1, 2, 2, 2, 3, 3, 5), result);
+ }
+
+ @Test
+ public void basic2() {
+
+ List<Integer> a = Lists.newArrayList(2);
+ List<Integer> b = Lists.newArrayList();
+ List<Integer> c = Lists.newArrayList(1, 2, 5);
+ List<Iterator<Integer>> input = Lists.newArrayList();
+ input.add(a.iterator());
+ input.add(b.iterator());
+ input.add(c.iterator());
+ SortedIteratorMerger<Integer> merger = new SortedIteratorMerger<Integer>(input.iterator(), getComp());
+ Iterator<Integer> iterator = merger.getIterator();
+ List<Integer> result = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ Assert.assertEquals(Lists.newArrayList(1, 2, 2, 5), result);
+ }
+
+ @Test
+ public void basic3() {
+
+ List<Integer> a = Lists.newArrayList();
+ List<Integer> b = Lists.newArrayList();
+ List<Integer> c = Lists.newArrayList();
+ List<Iterator<Integer>> input = Lists.newArrayList();
+ input.add(a.iterator());
+ input.add(b.iterator());
+ input.add(c.iterator());
+ SortedIteratorMerger<Integer> merger = new SortedIteratorMerger<Integer>(input.iterator(), getComp());
+ Iterator<Integer> iterator = merger.getIterator();
+ List<Integer> result = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ Assert.assertEquals(Lists.newArrayList(), result);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
new file mode 100644
index 0000000..1627b4f
--- /dev/null
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SortedIteratorMergerWithLimitTest {
+ class CloneableInteger implements Cloneable {
+ int value;
+
+ public CloneableInteger(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public Object clone() {
+ return new CloneableInteger(value);
+ }
+
+ @Override
+ public int hashCode() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ CloneableInteger that = (CloneableInteger) o;
+
+ return value == that.value;
+
+ }
+ }
+
+ private Comparator<CloneableInteger> getComp() {
+ return new Comparator<CloneableInteger>() {
+ @Override
+ public int compare(CloneableInteger o1, CloneableInteger o2) {
+ return o1.value - o2.value;
+ }
+ };
+ }
+
+ @Test
+ public void basic1() {
+
+ List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(3));
+ List<CloneableInteger> b = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(3));
+ List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5));
+ List<Iterator<CloneableInteger>> input = Lists.newArrayList();
+ input.add(a.iterator());
+ input.add(b.iterator());
+ input.add(c.iterator());
+ SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp());
+ Iterator<CloneableInteger> iterator = merger.getIterator();
+ List<CloneableInteger> result = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(1), new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(3), new CloneableInteger(3)), result);
+ }
+
+ @Test
+ public void basic2() {
+
+ List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2));
+ List<CloneableInteger> b = Lists.newArrayList();
+ List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5));
+ List<Iterator<CloneableInteger>> input = Lists.newArrayList();
+ input.add(a.iterator());
+ input.add(b.iterator());
+ input.add(c.iterator());
+ SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp());
+ Iterator<CloneableInteger> iterator = merger.getIterator();
+ List<CloneableInteger> result = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(5)), result);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void basic3() {
+
+ List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2), new CloneableInteger(1));
+ List<CloneableInteger> b = Lists.newArrayList();
+ List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5));
+ List<Iterator<CloneableInteger>> input = Lists.newArrayList();
+ input.add(a.iterator());
+ input.add(b.iterator());
+ input.add(c.iterator());
+ SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp());
+ Iterator<CloneableInteger> iterator = merger.getIterator();
+ List<CloneableInteger> result = Lists.newArrayList();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java b/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java
new file mode 100644
index 0000000..c295430
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query;
+
+import org.dbunit.DatabaseUnitException;
+import org.dbunit.assertion.DbUnitAssert;
+import org.dbunit.assertion.FailureHandler;
+import org.dbunit.dataset.Column;
+import org.dbunit.dataset.Columns;
+import org.dbunit.dataset.DataSetException;
+import org.dbunit.dataset.ITable;
+import org.dbunit.dataset.ITableMetaData;
+import org.dbunit.dataset.datatype.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * dirty hack to support checking result of SQL with limit
+ */
+public class HackedDbUnitAssert extends DbUnitAssert {
+ private static final Logger logger = LoggerFactory.getLogger(HackedDbUnitAssert.class);
+
+ public void assertEquals(ITable expectedTable, ITable actualTable, FailureHandler failureHandler) throws DatabaseUnitException {
+ logger.trace("assertEquals(expectedTable, actualTable, failureHandler) - start");
+ logger.debug("assertEquals: expectedTable={}", expectedTable);
+ logger.debug("assertEquals: actualTable={}", actualTable);
+ logger.debug("assertEquals: failureHandler={}", failureHandler);
+
+ // Do not continue if same instance
+ if (expectedTable == actualTable) {
+ logger.debug("The given tables reference the same object. Will return immediately. (Table={})", expectedTable);
+ return;
+ }
+
+ if (failureHandler == null) {
+ logger.debug("FailureHandler is null. Using default implementation");
+ failureHandler = getDefaultFailureHandler();
+ }
+
+ ITableMetaData expectedMetaData = expectedTable.getTableMetaData();
+ ITableMetaData actualMetaData = actualTable.getTableMetaData();
+ String expectedTableName = expectedMetaData.getTableName();
+
+ // // Verify row count
+ // int expectedRowsCount = expectedTable.getRowCount();
+ // int actualRowsCount = actualTable.getRowCount();
+ // if (expectedRowsCount != actualRowsCount) {
+ // String msg = "row count (table=" + expectedTableName + ")";
+ // Error error =
+ // failureHandler.createFailure(msg, String
+ // .valueOf(expectedRowsCount), String
+ // .valueOf(actualRowsCount));
+ // logger.error(error.toString());
+ // throw error;
+ // }
+
+ // if both tables are empty, it is not necessary to compare columns, as
+ // such
+ // comparison
+ // can fail if column metadata is different (which could occurs when
+ // comparing empty tables)
+ if (expectedTable.getRowCount() == 0 && actualTable.getRowCount() == 0) {
+ logger.debug("Tables are empty, hence equals.");
+ return;
+ }
+
+ // Put the columns into the same order
+ Column[] expectedColumns = Columns.getSortedColumns(expectedMetaData);
+ Column[] actualColumns = Columns.getSortedColumns(actualMetaData);
+
+ // Verify columns
+ Columns.ColumnDiff columnDiff = Columns.getColumnDiff(expectedMetaData, actualMetaData);
+ if (columnDiff.hasDifference()) {
+ String message = columnDiff.getMessage();
+ Error error = failureHandler.createFailure(message, Columns.getColumnNamesAsString(expectedColumns), Columns.getColumnNamesAsString(actualColumns));
+ logger.error(error.toString());
+ throw error;
+ }
+
+ // Get the datatypes to be used for comparing the sorted columns
+ ComparisonColumn[] comparisonCols = getComparisonColumns(expectedTableName, expectedColumns, actualColumns, failureHandler);
+
+ // Finally compare the data
+ compareData(expectedTable, actualTable, comparisonCols, failureHandler);
+ }
+
+ protected void compareData(ITable expectedTable, ITable actualTable, ComparisonColumn[] comparisonCols, FailureHandler failureHandler) throws DataSetException {
+ logger.debug("compareData(expectedTable={}, actualTable={}, " + "comparisonCols={}, failureHandler={}) - start", new Object[] { expectedTable, actualTable, comparisonCols, failureHandler });
+
+ if (expectedTable == null) {
+ throw new NullPointerException("The parameter 'expectedTable' must not be null");
+ }
+ if (actualTable == null) {
+ throw new NullPointerException("The parameter 'actualTable' must not be null");
+ }
+ if (comparisonCols == null) {
+ throw new NullPointerException("The parameter 'comparisonCols' must not be null");
+ }
+ if (failureHandler == null) {
+ throw new NullPointerException("The parameter 'failureHandler' must not be null");
+ }
+
+ for (int index = 0; index < actualTable.getRowCount(); index++) {
+ if (!findRowInExpectedTable(expectedTable, actualTable, comparisonCols, failureHandler, index)) {
+ throw new IllegalStateException();
+ }
+ }
+
+ }
+
+ private boolean findRowInExpectedTable(ITable expectedTable, ITable actualTable, ComparisonColumn[] comparisonCols, FailureHandler failureHandler, int index) throws DataSetException {
+
+ // iterate over all rows
+ for (int i = 0; i < expectedTable.getRowCount(); i++) {
+
+ // iterate over all columns of the current row
+ for (int j = 0; j < comparisonCols.length; j++) {
+ ComparisonColumn compareColumn = comparisonCols[j];
+
+ String columnName = compareColumn.getColumnName();
+ DataType dataType = compareColumn.getDataType();
+
+ Object expectedValue = expectedTable.getValue(i, columnName);
+ Object actualValue = actualTable.getValue(index, columnName);
+
+ // Compare the values
+ if (skipCompare(columnName, expectedValue, actualValue)) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("ignoring comparison " + expectedValue + "=" + actualValue + " on column " + columnName);
+ }
+ continue;
+ }
+
+ if (dataType.compare(expectedValue, actualValue) != 0) {
+ break;
+
+ // Difference diff = new Difference(expectedTable, actualTable, i, columnName, expectedValue, actualValue);
+ //
+ // // Handle the difference (throw error immediately or something else)
+ // failureHandler.handle(diff);
+ } else {
+ if (j == comparisonCols.length - 1) {
+ return true;
+ } else {
+ continue;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index a6e7956..2c428ec 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -35,6 +35,7 @@ import org.apache.kylin.query.enumerator.OLAPQuery;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.routing.Candidate;
import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.storage.hbase.HBaseStorage;
import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
import org.dbunit.database.DatabaseConnection;
import org.dbunit.database.IDatabaseConnection;
@@ -123,7 +124,7 @@ public class ITKylinQueryTest extends KylinTestBase {
@Test
public void testSingleExecuteQuery() throws Exception {
- String queryFileName = getQueryFolderPrefix() + "src/test/resources/query/sql_tableau/query20.sql";
+ String queryFileName = getQueryFolderPrefix() + "src/test/resources/query/temp/query01.sql";
File sqlFile = new File(queryFileName);
String sql = getTextFromFile(sqlFile);
@@ -187,7 +188,7 @@ public class ITKylinQueryTest extends KylinTestBase {
@Test
public void testPreciselyDistinctCountQuery() throws Exception {
if ("left".equalsIgnoreCase(joinType)) {
- execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_distinct_precisely", null, true);
+ execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/temp", null, true);
}
}
@@ -257,6 +258,13 @@ public class ITKylinQueryTest extends KylinTestBase {
}
@Test
+ public void testLimitCorrectness() throws Exception {
+ if (HBaseStorage.overwriteStorageQuery == null) {//v1 query engine will not work
+ execLimitAndValidate(getQueryFolderPrefix() + "src/test/resources/query/sql");
+ }
+ }
+
+ @Test
public void testTopNQuery() throws Exception {
if ("left".equalsIgnoreCase(joinType)) {
this.execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_topn", null, true);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 0511971..4e59815 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -34,6 +34,7 @@ import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@@ -62,6 +63,33 @@ import com.google.common.io.Files;
*/
public class KylinTestBase {
+ class ObjectArray {
+ Object[] data;
+
+ public ObjectArray(Object[] data) {
+ this.data = data;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ObjectArray that = (ObjectArray) o;
+
+ // Probably incorrect - comparing Object[] arrays with Arrays.equals
+ return Arrays.equals(data, that.data);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(data);
+ }
+ }
+
// Hack for the different constant integer type between optiq (INTEGER) and
// h2 (BIGINT)
public static class TestH2DataTypeFactory extends H2DataTypeFactory {
@@ -357,6 +385,50 @@ public class KylinTestBase {
}
}
+ protected void execLimitAndValidate(String queryFolder) throws Exception {
+ printInfo("---------- test folder: " + new File(queryFolder).getAbsolutePath());
+
+ int appendLimitQueries = 0;
+ List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql");
+ for (File sqlFile : sqlFiles) {
+ String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+ String sql = getTextFromFile(sqlFile);
+
+ String sqlWithLimit;
+ if (sql.toLowerCase().contains("limit ")) {
+ sqlWithLimit = sql;
+ } else {
+ sqlWithLimit = sql + " limit 5";
+ appendLimitQueries++;
+ }
+
+ // execute Kylin
+ printInfo("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
+ IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
+ ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false);
+
+ // execute H2
+ printInfo("Query Result from H2 - " + queryName);
+ H2Connection h2Conn = new H2Connection(h2Connection, null);
+ h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
+ ITable h2Table = executeQuery(h2Conn, queryName, sql, false);
+
+ try {
+ HackedDbUnitAssert hackedDbUnitAssert = new HackedDbUnitAssert();
+ hackedDbUnitAssert.assertEquals(h2Table, kylinTable);
+ } catch (Throwable t) {
+ printInfo("execAndCompQuery failed on: " + sqlFile.getAbsolutePath());
+ throw t;
+ }
+
+ compQueryCount++;
+ if (kylinTable.getRowCount() == 0) {
+ zeroResultQueries.add(sql);
+ }
+ }
+ printInfo("Queries appended with limit: " + appendLimitQueries);
+ }
+
protected void execAndCompQuery(String queryFolder, String[] exclusiveQuerys, boolean needSort) throws Exception {
printInfo("---------- test folder: " + new File(queryFolder).getAbsolutePath());
Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
index da8e7ce..3fdb92f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
@@ -29,7 +29,7 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTInfo.Builder;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTSampleCodeSystem;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTWriter;
import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator;
@@ -109,7 +109,7 @@ public class HBaseScannerBenchmark {
private void testScanRaw(String msg) throws IOException {
long t = System.currentTimeMillis();
- IGTScanner scan = simpleStore.scan(new GTScanRequest(info, null, null, null));
+ IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
ResultScanner innerScanner = ((SimpleHBaseStore.Reader) scan).getHBaseScanner();
int count = 0;
for (Result r : innerScanner) {
@@ -125,7 +125,7 @@ public class HBaseScannerBenchmark {
private void testScanRecords(String msg) throws IOException {
long t = System.currentTimeMillis();
- IGTScanner scan = simpleStore.scan(new GTScanRequest(info, null, null, null));
+ IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
int count = 0;
for (GTRecord rec : scan) {
count++;
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 1cebdea..4c599d9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -21,20 +21,11 @@ package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
-import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.DataFormatException;
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -52,7 +43,6 @@ import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
@@ -67,8 +57,6 @@ import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.Cub
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
@@ -79,153 +67,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
private static ExecutorService executorService = new LoggableCachedThreadPool();
- static class ExpectedSizeIterator implements Iterator<byte[]> {
-
- BlockingQueue<byte[]> queue;
-
- int expectedSize;
- int current = 0;
- long timeout;
- long timeoutTS;
- volatile Throwable coprocException;
-
- public ExpectedSizeIterator(int expectedSize) {
- this.expectedSize = expectedSize;
- this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
-
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
- this.timeout = hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5) * hconf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60000);
- this.timeout = Math.max(this.timeout, 5 * 60000);
- this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
-
- if (BackdoorToggles.getQueryTimeout() != -1) {
- this.timeout = BackdoorToggles.getQueryTimeout();
- }
-
- this.timeout *= 1.1; // allow for some delay
-
- logger.info("Timeout for ExpectedSizeIterator is: " + this.timeout);
-
- this.timeoutTS = System.currentTimeMillis() + this.timeout;
- }
-
- @Override
- public boolean hasNext() {
- return (current < expectedSize);
- }
-
- @Override
- public byte[] next() {
- if (current >= expectedSize) {
- throw new IllegalStateException("Won't have more data");
- }
- try {
- current++;
- byte[] ret = null;
-
- while (ret == null && coprocException == null && timeoutTS - System.currentTimeMillis() > 0) {
- ret = queue.poll(5000, TimeUnit.MILLISECONDS);
- }
-
- if (coprocException != null) {
- throw new RuntimeException("Error in coprocessor", coprocException);
- } else if (ret == null) {
- throw new RuntimeException("Timeout visiting cube!");
- } else {
- return ret;
- }
- } catch (InterruptedException e) {
- throw new RuntimeException("Error when waiting queue", e);
- }
- }
-
- @Override
- public void remove() {
- throw new NotImplementedException();
- }
-
- public void append(byte[] data) {
- try {
- queue.put(data);
- } catch (InterruptedException e) {
- throw new RuntimeException("error when waiting queue", e);
- }
- }
-
- public long getTimeout() {
- return timeout;
- }
-
- public void notifyCoprocException(Throwable ex) {
- coprocException = ex;
- }
- }
-
- static class EndpointResultsAsGTScanner implements IGTScanner {
- private GTInfo info;
- private Iterator<byte[]> blocks;
- private ImmutableBitSet columns;
- private long totalScannedCount;
-
- public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount) {
- this.info = info;
- this.blocks = blocks;
- this.columns = columns;
- this.totalScannedCount = totalScannedCount;
- }
-
- @Override
- public GTInfo getInfo() {
- return info;
- }
-
- @Override
- public long getScannedRowCount() {
- return totalScannedCount;
- }
-
- @Override
- public void close() throws IOException {
- //do nothing
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- return Iterators.concat(Iterators.transform(blocks, new Function<byte[], Iterator<GTRecord>>() {
- @Nullable
- @Override
- public Iterator<GTRecord> apply(@Nullable final byte[] input) {
-
- return new Iterator<GTRecord>() {
- private ByteBuffer inputBuffer = null;
- private GTRecord oneRecord = null;
-
- @Override
- public boolean hasNext() {
- if (inputBuffer == null) {
- inputBuffer = ByteBuffer.wrap(input);
- oneRecord = new GTRecord(info);
- }
-
- return inputBuffer.position() < inputBuffer.limit();
- }
-
- @Override
- public GTRecord next() {
- oneRecord.loadColumns(columns, inputBuffer);
- return oneRecord;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- }));
- }
- }
-
public CubeHBaseEndpointRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
super(cubeSeg, cuboid, fullGTInfo);
}
@@ -345,7 +186,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
builder.setBehavior(toggle);
builder.setStartTime(System.currentTimeMillis());
- builder.setTimeout(epResultItr.getTimeout());
+ builder.setTimeout(epResultItr.getRpcTimeout());
builder.setKylinProperties(kylinConfig.getConfigAsString());
for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
@@ -407,7 +248,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
if (abnormalFinish[0]) {
- Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
+ Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query...");
logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout
epResultItr.notifyCoprocException(ex);
return;
@@ -416,7 +257,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
});
}
- return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
+ return new GTBlobScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit());
}
private String getStatsString(byte[] region, CubeVisitResponse result) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
new file mode 100644
index 0000000..4e0d15e
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+class ExpectedSizeIterator implements Iterator<byte[]> {
+
+ BlockingQueue<byte[]> queue;
+
+ int expectedSize;
+ int current = 0;
+ long rpcTimeout;
+ long timeout;
+ long timeoutTS;
+ volatile Throwable coprocException;
+
+ public ExpectedSizeIterator(int expectedSize) {
+ this.expectedSize = expectedSize;
+ this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
+
+ Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+ this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ this.timeout = this.rpcTimeout * hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ CubeHBaseEndpointRPC.logger.info("rpc timeout is {} and after multiply retry times become {}", this.rpcTimeout, this.timeout);
+ this.timeout = Math.max(this.timeout, 5 * 60000);
+ this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
+
+ if (BackdoorToggles.getQueryTimeout() != -1) {
+ this.timeout = BackdoorToggles.getQueryTimeout();
+ }
+
+ this.timeout *= 1.1; // allow for some delay
+
+ CubeHBaseEndpointRPC.logger.info("Final Timeout for ExpectedSizeIterator is: " + this.timeout);
+
+ this.timeoutTS = System.currentTimeMillis() + this.timeout;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (current < expectedSize);
+ }
+
+ @Override
+ public byte[] next() {
+ if (current >= expectedSize) {
+ throw new IllegalStateException("Won't have more data");
+ }
+ try {
+ current++;
+ byte[] ret = null;
+
+ while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) {
+ ret = queue.poll(5000, TimeUnit.MILLISECONDS);
+ }
+
+ if (coprocException != null) {
+ throw new RuntimeException("Error in coprocessor", coprocException);
+ } else if (ret == null) {
+ throw new RuntimeException("Timeout visiting cube!");
+ } else {
+ return ret;
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Error when waiting queue", e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new NotImplementedException();
+ }
+
+ public void append(byte[] data) {
+ try {
+ queue.put(data);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("error when waiting queue", e);
+ }
+ }
+
+ public long getRpcTimeout() {
+ return this.rpcTimeout;
+ }
+
+ public void notifyCoprocException(Throwable ex) {
+ coprocException = ex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
new file mode 100644
index 0000000..631510e
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.gtrecord.SortedIteratorMergerWithLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * scatter the blob returned from region server to a iterable of gtrecords
+ */
+class GTBlobScatter implements IGTScanner {
+
+ private static final Logger logger = LoggerFactory.getLogger(GTBlobScatter.class);
+
+ private GTInfo info;
+ private Iterator<byte[]> blocks;
+ private ImmutableBitSet columns;
+ private long totalScannedCount;
+ private int storagePushDownLimit = -1;
+
+ public GTBlobScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) {
+ this.info = info;
+ this.blocks = blocks;
+ this.columns = columns;
+ this.totalScannedCount = totalScannedCount;
+ this.storagePushDownLimit = storagePushDownLimit;
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public long getScannedRowCount() {
+ return totalScannedCount;
+ }
+
+ @Override
+ public void close() throws IOException {
+ //do nothing
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new GTBlobScatterFunc());
+ if (storagePushDownLimit <= KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
+ return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
+ } else {
+ return Iterators.concat(shardSubsets);
+ }
+ }
+
+ class GTBlobScatterFunc implements Function<byte[], Iterator<GTRecord>> {
+ @Nullable
+ @Override
+ public Iterator<GTRecord> apply(@Nullable final byte[] input) {
+
+ return new Iterator<GTRecord>() {
+ private ByteBuffer inputBuffer = null;
+ //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
+ private GTRecord firstRecord = null;
+ private GTRecord secondRecord = null;
+ private GTRecord thirdRecord = null;
+ private GTRecord fourthRecord = null;
+ private int counter = 0;
+
+ @Override
+ public boolean hasNext() {
+ if (inputBuffer == null) {
+ inputBuffer = ByteBuffer.wrap(input);
+ firstRecord = new GTRecord(info);
+ secondRecord = new GTRecord(info);
+ thirdRecord = new GTRecord(info);
+ fourthRecord = new GTRecord(info);
+ }
+
+ return inputBuffer.position() < inputBuffer.limit();
+ }
+
+ @Override
+ public GTRecord next() {
+ firstRecord.loadColumns(columns, inputBuffer);
+ //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord));
+ return firstRecord;
+ // GTRecord temp = new GTRecord(info);
+ // temp.loadColumns(columns, inputBuffer);
+ // return temp;
+
+ // counter++;
+ // int index = counter % 4;
+ // if (index == 1) {
+ // firstRecord.loadColumns(columns, inputBuffer);
+ // //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord));
+ // return firstRecord;
+ // } else if (index == 2) {
+ // secondRecord.loadColumns(columns, inputBuffer);
+ // //logger.info("B GTRecord: " + System.identityHashCode(this) + " " + secondRecord + " " + System.identityHashCode(secondRecord));
+ // return secondRecord;
+ // } else if (index == 3) {
+ // thirdRecord.loadColumns(columns, inputBuffer);
+ // //logger.info("C GTRecord: " + System.identityHashCode(this) + " " + thirdRecord + " " + System.identityHashCode(thirdRecord));
+ // return thirdRecord;
+ // } else {
+ // fourthRecord.loadColumns(columns, inputBuffer);
+ // //logger.info("D GTRecord: " + System.identityHashCode(this) + " " + fourthRecord + " " + System.identityHashCode(fourthRecord));
+ // return fourthRecord;
+ // }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 5b7a26a..cbccac6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -48,7 +48,9 @@ import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanExceedThresholdException;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanTimeoutException;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.measure.BufferedMeasureEncoder;
@@ -235,13 +237,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
- scanReq.setAggrCacheGB(0); // disable mem check if so told
+ scanReq.setAggCacheMemThreshold(0); // disable mem check if so told
}
final MutableBoolean scanNormalComplete = new MutableBoolean(true);
- final long startTime = this.serviceStartTime;
- final long timeout = request.getTimeout();
- final int rowLimit = scanReq.getRowLimit();
+ final long deadline = request.getTimeout() + this.serviceStartTime;
+ final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
final CellListIterator cellListIterator = new CellListIterator() {
@@ -256,19 +257,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
@Override
public boolean hasNext() {
- if (rowLimit > 0 && rowLimit <= counter)
- return false;
- if (counter % 100000 == 1) {
- if (System.currentTimeMillis() - startTime > timeout) {
- scanNormalComplete.setValue(false);
- logger.error("scanner aborted because timeout");
- return false;
- }
+ if (counter > scanReq.getStorageScanRowNumThreshold()) {
+ throw new GTScanExceedThresholdException("Exceed scan threshold at " + counter);
}
if (counter % 100000 == 1) {
- logger.info("Scanned " + counter + " rows.");
+ logger.info("Scanned " + counter + " rows from HBase.");
}
counter++;
return allCellLists.hasNext();
@@ -290,38 +285,47 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
IGTScanner rawScanner = store.scan(scanReq);
IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), //
- behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
+ behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(), deadline);
ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
int finalRowCount = 0;
- for (GTRecord oneRecord : finalScanner) {
- if (!scanNormalComplete.booleanValue()) {
- logger.error("aggregate iterator aborted because input iterator aborts");
- break;
- }
+ try {
+ for (GTRecord oneRecord : finalScanner) {
- if (finalRowCount % 100000 == 1) {
- if (System.currentTimeMillis() - startTime > timeout) {
- logger.error("aggregate iterator aborted because timeout");
+ if (finalRowCount > storagePushDownLimit) {
+ logger.info("The finalScanner aborted because storagePushDownLimit is satisfied");
break;
}
- }
- buffer.clear();
- try {
- oneRecord.exportColumns(scanReq.getColumns(), buffer);
- } catch (BufferOverflowException boe) {
- buffer = ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2);
- oneRecord.exportColumns(scanReq.getColumns(), buffer);
- }
+ if (finalRowCount % 100000 == 1) {
+ if (System.currentTimeMillis() > deadline) {
+ throw new GTScanTimeoutException("finalScanner timeouts after scanned " + finalRowCount);
+ }
+ }
- outputStream.write(buffer.array(), 0, buffer.position());
- finalRowCount++;
+ buffer.clear();
+ try {
+ oneRecord.exportColumns(scanReq.getColumns(), buffer);
+ } catch (BufferOverflowException boe) {
+ buffer = ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2);
+ oneRecord.exportColumns(scanReq.getColumns(), buffer);
+ }
+
+ outputStream.write(buffer.array(), 0, buffer.position());
+ finalRowCount++;
+ }
+ } catch (GTScanTimeoutException e) {
+ scanNormalComplete.setValue(false);
+ logger.info("The cube visit did not finish normally because scan timeout", e);
+ } catch (GTScanExceedThresholdException e) {
+ scanNormalComplete.setValue(false);
+ logger.info("The cube visit did not finish normally because scan num exceeds threshold", e);
+ } finally {
+ finalScanner.close();
}
- finalScanner.close();
appendProfileInfo(sb, "agg done");
[2/2] kylin git commit: KYLIN-1936 improve enable limit logic
(exactAggregation is too strict)
Posted by ma...@apache.org.
KYLIN-1936 improve enable limit logic (exactAggregation is too strict)
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/28e94230
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/28e94230
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/28e94230
Branch: refs/heads/master
Commit: 28e942306d545ff3b5f604e8225bacad650ad8ac
Parents: c67891d
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Aug 5 12:50:27 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Aug 10 12:56:32 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 6 +-
.../inmemcubing/AbstractInMemCubeBuilder.java | 3 +-
.../cube/inmemcubing/DoggedCubeBuilder.java | 4 +-
.../cube/inmemcubing/InMemCubeBuilder.java | 9 +-
.../kylin/gridtable/GTAggregateScanner.java | 38 ++++-
.../java/org/apache/kylin/gridtable/GTInfo.java | 2 +-
.../org/apache/kylin/gridtable/GTRecord.java | 33 +++-
.../GTScanExceedThresholdException.java | 26 +++
.../kylin/gridtable/GTScanRangePlanner.java | 2 +-
.../apache/kylin/gridtable/GTScanRequest.java | 92 +++++-----
.../kylin/gridtable/GTScanRequestBuilder.java | 97 +++++++++++
.../kylin/gridtable/GTScanTimeoutException.java | 26 +++
.../gridtable/benchmark/GTScannerBenchmark.java | 5 +-
.../benchmark/GTScannerBenchmark2.java | 5 +-
.../inmemcubing/ConcurrentDiskStoreTest.java | 4 +-
.../cube/inmemcubing/MemDiskStoreTest.java | 4 +-
.../gridtable/AggregationCacheSpillTest.java | 12 +-
.../kylin/gridtable/DictGridTableTest.java | 10 +-
.../kylin/gridtable/SimpleGridTableTest.java | 4 +-
.../org/apache/kylin/metadata/tuple/ITuple.java | 2 +-
.../org/apache/kylin/metadata/tuple/Tuple.java | 5 +
.../apache/kylin/storage/StorageContext.java | 4 +
.../storage/gtrecord/CubeSegmentScanner.java | 8 +-
.../storage/gtrecord/CubeTupleConverter.java | 148 +++++++++++-----
.../gtrecord/FetchSourceAwareIterator.java | 24 +++
.../gtrecord/GTCubeStorageQueryBase.java | 47 +++++-
.../storage/gtrecord/IFetchSourceAware.java | 25 +++
.../kylin/storage/gtrecord/PeekingImpl.java | 73 ++++++++
.../gtrecord/SegmentCubeTupleIterator.java | 162 ++++++++++++++++++
.../gtrecord/SequentialCubeTupleIterator.java | 151 ++++-------------
.../storage/gtrecord/SortedIteratorMerger.java | 100 +++++++++++
.../gtrecord/SortedIteratorMergerWithLimit.java | 143 ++++++++++++++++
.../gtrecord/SortedIteratorMergerTest.java | 97 +++++++++++
.../SortedIteratorMergerWithLimitTest.java | 127 ++++++++++++++
.../apache/kylin/query/HackedDbUnitAssert.java | 169 +++++++++++++++++++
.../apache/kylin/query/ITKylinQueryTest.java | 12 +-
.../org/apache/kylin/query/KylinTestBase.java | 72 ++++++++
.../hbase/cube/HBaseScannerBenchmark.java | 6 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 165 +-----------------
.../hbase/cube/v2/ExpectedSizeIterator.java | 116 +++++++++++++
.../storage/hbase/cube/v2/GTBlobScatter.java | 150 ++++++++++++++++
.../coprocessor/endpoint/CubeVisitService.java | 70 ++++----
42 files changed, 1811 insertions(+), 447 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f51dce6..eb4102a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -23,9 +23,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
import java.util.SortedSet;
+import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -470,6 +470,10 @@ abstract public class KylinConfigBase implements Serializable {
return Float.parseFloat(getOptional("kylin.hbase.hfile.size.gb", "2.0"));
}
+ public int getStoragePushDownLimitMax() {
+ return Integer.parseInt(getOptional("kylin.query.pushdown.limit.max", "10000"));
+ }
+
public int getScanThreshold() {
return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index e385ab9..d417d11 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -25,6 +25,7 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.model.TblColRef;
@@ -83,7 +84,7 @@ abstract public class AbstractInMemCubeBuilder {
protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
long startTime = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = gridTable.scan(req);
for (GTRecord record : scanner) {
output.write(cuboidId, record);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 606c820..15f2241 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -35,7 +35,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.metadata.model.TblColRef;
@@ -399,7 +399,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
if (cuboidIterator.hasNext()) {
CuboidResult cuboid = cuboidIterator.next();
currentCuboidId = cuboid.cuboidId;
- scanner = cuboid.table.scan(new GTScanRequest(cuboid.table.getInfo(), null, null, null));
+ scanner = cuboid.table.scan(new GTScanRequestBuilder().setInfo(cuboid.table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
recordIterator = scanner.iterator();
} else {
return false;
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e12e815..36d1296 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -31,8 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
@@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTBuilder;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.topn.Counter;
@@ -329,8 +330,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
- GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
- GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(baseCuboid.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(dimensionMetricsBitSet.getFirst()).setAggrMetrics(dimensionMetricsBitSet.getSecond()).setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
+ GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, Long.MAX_VALUE);
aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
int count = 0;
@@ -397,7 +398,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
GTInfo info = gridTable.getInfo();
- GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(aggregationColumns).setAggrMetrics(measureColumns).setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
// for child cuboid, some measures don't need aggregation.
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index cb23af4..ccf4895 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -63,24 +63,28 @@ public class GTAggregateScanner implements IGTScanner {
final IGTScanner inputScanner;
final AggregationCache aggrCache;
final long spillThreshold;
+ final int storagePushDownLimit;//default to be Int.MAX
+ final long deadline;
private int aggregatedRowCount = 0;
private MemoryWaterLevel memTracker;
private boolean[] aggrMask;
- public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+ public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, long deadline) {
if (!req.hasAggregation())
throw new IllegalStateException();
this.info = inputScanner.getInfo();
- this.dimensions = req.getColumns().andNot(req.getAggrMetrics());
+ this.dimensions = req.getDimensions();
this.groupBy = req.getAggrGroupBy();
this.metrics = req.getAggrMetrics();
this.metricsAggrFuncs = req.getAggrMetricsFuncs();
this.inputScanner = inputScanner;
this.aggrCache = new AggregationCache();
- this.spillThreshold = (long) (req.getAggrCacheGB() * MemoryBudgetController.ONE_GB);
+ this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
this.aggrMask = new boolean[metricsAggrFuncs.length];
+ this.storagePushDownLimit = req.getStoragePushDownLimit();
+ this.deadline = deadline;
Arrays.fill(aggrMask, true);
}
@@ -133,8 +137,25 @@ public class GTAggregateScanner implements IGTScanner {
public Iterator<GTRecord> iterator() {
long count = 0;
for (GTRecord r : inputScanner) {
+
count++;
- aggrCache.aggregate(r);
+
+ if (getNumOfSpills() == 0) {
+ //check limit
+ boolean ret = aggrCache.aggregate(r, storagePushDownLimit);
+
+ if (!ret) {
+ logger.info("abort reading inputScanner because storage push down limit is hit");
+ break;//limit is hit
+ }
+ } else {//else if dumps is not empty, it means a lot of row need aggregated, so it's less likely that limit clause is helping
+ aggrCache.aggregate(r, Integer.MAX_VALUE);
+ }
+
+ //check deadline
+ if (count % 10000 == 1 && System.currentTimeMillis() > deadline) {
+ throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count);
+ }
}
logger.info("GTAggregateScanner input rows: " + count);
return aggrCache.iterator();
@@ -241,7 +262,7 @@ public class GTAggregateScanner implements IGTScanner {
return result;
}
- void aggregate(GTRecord r) {
+ boolean aggregate(GTRecord r, int stopForLimit) {
if (++aggregatedRowCount % 100000 == 0) {
if (memTracker != null) {
memTracker.markHigh();
@@ -257,6 +278,12 @@ public class GTAggregateScanner implements IGTScanner {
final byte[] key = createKey(r);
MeasureAggregator[] aggrs = aggBufMap.get(key);
if (aggrs == null) {
+
+ //for storage push down limit
+ if (aggBufMap.size() >= stopForLimit) {
+ return false;
+ }
+
aggrs = newAggregators();
aggBufMap.put(key, aggrs);
}
@@ -267,6 +294,7 @@ public class GTAggregateScanner implements IGTScanner {
aggrs[i].aggregate(metrics);
}
}
+ return true;
}
private void spillBuffMap() throws RuntimeException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index 673d22e..21da4ea 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -151,7 +151,7 @@ public class GTInfo {
if (!expected.equals(ref))
throw new IllegalArgumentException();
}
-
+
void validate() {
if (codeSystem == null)
throw new IllegalStateException();
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 37f42c7..4d26029 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -20,6 +20,7 @@ package org.apache.kylin.gridtable;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import org.apache.kylin.common.util.ByteArray;
@@ -27,7 +28,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
import com.google.common.base.Preconditions;
-public class GTRecord implements Comparable<GTRecord> {
+public class GTRecord implements Comparable<GTRecord>, Cloneable {
final transient GTInfo info;
final ByteArray[] cols;
@@ -54,6 +55,11 @@ public class GTRecord implements Comparable<GTRecord> {
}
}
+ @Override
+ public Object clone() {
+ return new GTRecord(this);
+ }
+
public GTInfo getInfo() {
return info;
}
@@ -189,12 +195,33 @@ public class GTRecord implements Comparable<GTRecord> {
@Override
public int compareTo(GTRecord o) {
+ return compareToInternal(o, info.colAll);
+ }
+
+ public int compareToOnPrimaryKey(GTRecord o) {
+ return compareToInternal(o, info.primaryKey);
+ }
+
+ public static Comparator<GTRecord> getPrimaryKeyComparator() {
+ return new Comparator<GTRecord>() {
+ @Override
+ public int compare(GTRecord o1, GTRecord o2) {
+ if (o1 == null || o2 == null) {
+ throw new IllegalStateException("Cannot handle null");
+ }
+
+ return o1.compareToOnPrimaryKey(o2);
+ }
+ };
+ }
+
+ private int compareToInternal(GTRecord o, ImmutableBitSet participateCols) {
assert this.info == o.info; // reference equal for performance
IGTComparator comparator = info.codeSystem.getComparator();
int comp = 0;
- for (int i = 0; i < info.colAll.trueBitCount(); i++) {
- int c = info.colAll.trueBitAt(i);
+ for (int i = 0; i < participateCols.trueBitCount(); i++) {
+ int c = participateCols.trueBitAt(i);
comp = comparator.compare(cols[c], o.cols[c]);
if (comp != 0)
return comp;
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
new file mode 100644
index 0000000..dd57e90
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public class GTScanExceedThresholdException extends RuntimeException {
+
+ public GTScanExceedThresholdException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index 17d27f9..b8f7e0e 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -165,7 +165,7 @@ public class GTScanRangePlanner {
GTScanRequest scanRequest;
List<GTScanRange> scanRanges = this.planScanRanges();
if (scanRanges != null && scanRanges.size() != 0) {
- scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter);
+ scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).createGTScanRequest();
} else {
scanRequest = null;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 0ce6b4c..e2bac3d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -57,20 +57,14 @@ public class GTScanRequest {
private String[] aggrMetricsFuncs;
// hint to storage behavior
- private boolean allowPreAggregation = true;
- private double aggrCacheGB = 0; // 0 means no row/memory limit; positive means memory limit in GB; negative means row limit
-
- public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) {
- this(info, ranges, columns, null, null, null, filterPushDown, true, 0);
- }
-
- public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
- ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
- this(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, true, 0);
- }
-
- public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
- ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) {
+ private boolean allowStorageAggregation;
+ private double aggCacheMemThreshold;
+ private int storageScanRowNumThreshold;
+ private int storagePushDownLimit;
+
+ GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+ ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, //
+ double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit) {
this.info = info;
if (ranges == null) {
this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
@@ -84,8 +78,10 @@ public class GTScanRequest {
this.aggrMetrics = aggrMetrics;
this.aggrMetricsFuncs = aggrMetricsFuncs;
- this.allowPreAggregation = allowPreAggregation;
- this.aggrCacheGB = aggrCacheGB;
+ this.allowStorageAggregation = allowStorageAggregation;
+ this.aggCacheMemThreshold = aggCacheMemThreshold;
+ this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+ this.storagePushDownLimit = storagePushDownLimit;
validate(info);
}
@@ -143,7 +139,7 @@ public class GTScanRequest {
}
public IGTScanner decorateScanner(IGTScanner scanner) throws IOException {
- return decorateScanner(scanner, true, true);
+ return decorateScanner(scanner, true, true, Long.MAX_VALUE);
}
/**
@@ -152,7 +148,7 @@ public class GTScanRequest {
* <p/>
* Refer to CoprocessorBehavior for explanation
*/
- public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException {
+ public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr, long deadline) throws IOException {
IGTScanner result = scanner;
if (!doFilter) { //Skip reading this section if you're not profiling!
int scanned = lookAndForget(result);
@@ -169,11 +165,11 @@ public class GTScanRequest {
return new EmptyGTScanner(scanned);
}
- if (!this.allowPreAggregation) {
+ if (!this.isAllowStorageAggregation()) {
logger.info("pre aggregation is not beneficial, skip it");
} else if (this.hasAggregation()) {
logger.info("pre aggregating results before returning");
- result = new GTAggregateScanner(result, this);
+ result = new GTAggregateScanner(result, this, deadline);
} else {
logger.info("has no aggregation, skip it");
}
@@ -235,6 +231,10 @@ public class GTScanRequest {
return filterPushDown;
}
+ public ImmutableBitSet getDimensions() {
+ return this.getColumns().andNot(this.getAggrMetrics());
+ }
+
public ImmutableBitSet getAggrGroupBy() {
return aggrGroupBy;
}
@@ -247,34 +247,41 @@ public class GTScanRequest {
return aggrMetricsFuncs;
}
- public boolean isAllowPreAggregation() {
- return allowPreAggregation;
+ public boolean isAllowStorageAggregation() {
+ return allowStorageAggregation;
}
- public void setAllowPreAggregation(boolean allowPreAggregation) {
- this.allowPreAggregation = allowPreAggregation;
+ public void setAllowStorageAggregation(boolean allowStorageAggregation) {
+ this.allowStorageAggregation = allowStorageAggregation;
}
- public double getAggrCacheGB() {
- if (aggrCacheGB < 0)
+ public double getAggCacheMemThreshold() {
+ if (aggCacheMemThreshold < 0)
return 0;
else
- return aggrCacheGB;
+ return aggCacheMemThreshold;
}
- public void setAggrCacheGB(double gb) {
- this.aggrCacheGB = gb;
+ public void setAggCacheMemThreshold(double gb) {
+ this.aggCacheMemThreshold = gb;
}
- public int getRowLimit() {
- if (aggrCacheGB < 0)
- return (int) -aggrCacheGB;
- else
- return 0;
+ public int getStorageScanRowNumThreshold() {
+ return storageScanRowNumThreshold;
+ }
+
+ public void setStorageScanRowNumThreshold(int storageScanRowNumThreshold) {
+ logger.info("storageScanRowNumThreshold is set to " + storageScanRowNumThreshold);
+ this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+ }
+
+ public int getStoragePushDownLimit() {
+ return this.storagePushDownLimit;
}
- public void setRowLimit(int limit) {
- aggrCacheGB = -limit;
+ public void setStoragePushDownLimit(int limit) {
+ logger.info("storagePushDownLimit is set to " + storagePushDownLimit);
+ this.storagePushDownLimit = limit;
}
public List<Integer> getRequiredMeasures() {
@@ -323,8 +330,10 @@ public class GTScanRequest {
ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out);
ImmutableBitSet.serializer.serialize(value.aggrMetrics, out);
BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out);
- BytesUtil.writeVInt(value.allowPreAggregation ? 1 : 0, out);
- out.putDouble(value.aggrCacheGB);
+ BytesUtil.writeVInt(value.allowStorageAggregation ? 1 : 0, out);
+ out.putDouble(value.aggCacheMemThreshold);
+ BytesUtil.writeVInt(value.storageScanRowNumThreshold, out);
+ BytesUtil.writeVInt(value.storagePushDownLimit, out);
}
@Override
@@ -353,8 +362,13 @@ public class GTScanRequest {
String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in);
boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1);
double sAggrCacheGB = in.getDouble();
+ int storageScanRowNumThreshold = BytesUtil.readVInt(in);
+ int storagePushDownLimit = BytesUtil.readVInt(in);
- return new GTScanRequest(sInfo, sRanges, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
+ return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).//
+ setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).//
+ setFilterPushDown(sGTFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).//
+ setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).createGTScanRequest();
}
private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
new file mode 100644
index 0000000..49ec759
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.TupleFilter;
+
+public class GTScanRequestBuilder {
+ private GTInfo info;
+ private List<GTScanRange> ranges;
+ private TupleFilter filterPushDown;
+ private ImmutableBitSet dimensions;
+ private ImmutableBitSet aggrGroupBy = null;
+ private ImmutableBitSet aggrMetrics = null;
+ private String[] aggrMetricsFuncs = null;
+ private boolean allowStorageAggregation = true;
+ private double aggCacheMemThreshold = 0;
+ private int storageScanRowNumThreshold = Integer.MAX_VALUE;// storage should terminate itself when $storageScanRowNumThreshold cuboid rows are scanned, and throw exception.
+ private int storagePushDownLimit = Integer.MAX_VALUE;// storage can quit working when $toragePushDownLimit aggregated rows are produced.
+
+ public GTScanRequestBuilder setInfo(GTInfo info) {
+ this.info = info;
+ return this;
+ }
+
+ public GTScanRequestBuilder setRanges(List<GTScanRange> ranges) {
+ this.ranges = ranges;
+ return this;
+ }
+
+ public GTScanRequestBuilder setFilterPushDown(TupleFilter filterPushDown) {
+ this.filterPushDown = filterPushDown;
+ return this;
+ }
+
+ public GTScanRequestBuilder setDimensions(ImmutableBitSet dimensions) {
+ this.dimensions = dimensions;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAggrGroupBy(ImmutableBitSet aggrGroupBy) {
+ this.aggrGroupBy = aggrGroupBy;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAggrMetrics(ImmutableBitSet aggrMetrics) {
+ this.aggrMetrics = aggrMetrics;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAggrMetricsFuncs(String[] aggrMetricsFuncs) {
+ this.aggrMetricsFuncs = aggrMetricsFuncs;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAllowStorageAggregation(boolean allowStorageAggregation) {
+ this.allowStorageAggregation = allowStorageAggregation;
+ return this;
+ }
+
+ public GTScanRequestBuilder setAggCacheMemThreshold(double aggCacheMemThreshold) {
+ this.aggCacheMemThreshold = aggCacheMemThreshold;
+ return this;
+ }
+
+ public GTScanRequestBuilder setStorageScanRowNumThreshold(int storageScanRowNumThreshold) {
+ this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+ return this;
+ }
+
+ public GTScanRequestBuilder setStoragePushDownLimit(int storagePushDownLimit) {
+ this.storagePushDownLimit = storagePushDownLimit;
+ return this;
+ }
+
+ public GTScanRequest createGTScanRequest() {
+ return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
new file mode 100644
index 0000000..e92dae3
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public class GTScanTimeoutException extends RuntimeException {
+
+ public GTScanTimeoutException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
index 680ba33..589f37c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
@@ -29,6 +29,7 @@ import org.apache.kylin.gridtable.GTInfo.Builder;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTSampleCodeSystem;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -110,7 +111,7 @@ public class GTScannerBenchmark {
@SuppressWarnings("unused")
private void testAggregate(ImmutableBitSet groupBy) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(dimensions).setAggrGroupBy(groupBy).setAggrMetrics(metrics).setAggrMetricsFuncs(aggrFuncs).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
@@ -154,7 +155,7 @@ public class GTScannerBenchmark {
@SuppressWarnings("unused")
private void testFilter(TupleFilter filter) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, info.getAllColumns(), filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(info.getAllColumns()).setFilterPushDown(filter).createGTScanRequest();
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
index ae86e46..40a5e01 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
@@ -31,6 +31,7 @@ import org.apache.kylin.gridtable.GTInfo.Builder;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTSampleCodeSystem;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator.Randomizer;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
@@ -132,7 +133,7 @@ public class GTScannerBenchmark2 {
@SuppressWarnings("unused")
private void testAggregate(ImmutableBitSet groupBy) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(dimensions).setAggrGroupBy(groupBy).setAggrMetrics(metrics).setAggrMetricsFuncs(aggrFuncs).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
@@ -176,7 +177,7 @@ public class GTScannerBenchmark2 {
@SuppressWarnings("unused")
private void testFilter(TupleFilter filter) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, info.getAllColumns(), filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(info.getAllColumns()).setFilterPushDown(filter).createGTScanRequest();
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
index ceaf80d..6355f4a 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.gridtable.GTBuilder;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.UnitTestSupport;
@@ -84,7 +84,7 @@ public class ConcurrentDiskStoreTest extends LocalFileMetadataTestCase {
t[i] = new Thread() {
public void run() {
try {
- IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo(), null, null, null));
+ IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
int i = 0;
for (GTRecord r : scanner) {
assertEquals(data.get(i++), r);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
index 807a6e3..06ade1c 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
@@ -27,7 +27,7 @@ import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.gridtable.GTBuilder;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.UnitTestSupport;
@@ -100,7 +100,7 @@ public class MemDiskStoreTest extends LocalFileMetadataTestCase {
}
builder.close();
- IGTScanner scanner = table.scan(new GTScanRequest(info, null, null, null));
+ IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
int i = 0;
for (GTRecord r : scanner) {
assertEquals(data.get(i++), r);
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
index 4160f86..b5f6de7 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -84,10 +84,10 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
}
};
- GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
- scanRequest.setAggrCacheGB(0.5);
+ GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(0, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
+ scanRequest.setAggCacheMemThreshold(0.5);
- GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+ GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
int count = 0;
for (GTRecord record : scanner) {
@@ -127,10 +127,10 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
};
// all-in-mem testcase
- GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
- scanRequest.setAggrCacheGB(0.5);
+ GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(1, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
+ scanRequest.setAggCacheMemThreshold(0.5);
- GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+ GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
int count = 0;
for (GTRecord record : scanner) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index af39e21..74338af 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -256,7 +256,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
@Test
public void verifyFirstRow() throws IOException {
- doScanAndVerify(table, new GTScanRequest(table.getInfo(), null, null, null), "[1421193600000, 30, Yang, 10, 10.5]", //
+ doScanAndVerify(table, new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(), "[1421193600000, 30, Yang, 10, 10.5]", //
"[1421193600000, 30, Luke, 10, 10.5]", //
"[1421280000000, 20, Dong, 10, 10.5]", //
"[1421280000000, 20, Jason, 10, 10.5]", //
@@ -276,7 +276,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
- Assert.assertEquals(origin.getAggrCacheGB(), sGTScanRequest.getAggrCacheGB(), 0.01);
+ Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
return sGTScanRequest;
}
@@ -289,7 +289,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
- GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
// note the unEvaluatable column 1 in filter is added to group by
assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
@@ -305,7 +305,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
LogicalTupleFilter filter = and(fComp1, fComp2);
- GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
// note the evaluatable column 1 in filter is added to returned columns but not in group by
assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
@@ -334,7 +334,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
@SuppressWarnings("unused")
private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) throws IOException {
long start = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, null, filter);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(filter).createGTScanRequest();
IGTScanner scanner = table.scan(req);
int i = 0;
for (GTRecord r : scanner) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 2abe928..fd571d0 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -90,7 +90,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
}
private IGTScanner scan(GridTable table) throws IOException {
- GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = table.scan(req);
for (GTRecord r : scanner) {
Object[] v = r.getValues();
@@ -106,7 +106,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
}
private IGTScanner scanAndAggregate(GridTable table) throws IOException {
- GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0, 2)).setAggrMetrics(setOf(3, 4)).setAggrMetricsFuncs(new String[]{"count", "sum"}).setFilterPushDown(null).createGTScanRequest();
IGTScanner scanner = table.scan(req);
int i = 0;
for (GTRecord r : scanner) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
index 7d401ec..742cfba 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
*
* @author yangli9
*/
-public interface ITuple extends IEvaluatableTuple {
+public interface ITuple extends IEvaluatableTuple, Cloneable {
List<String> getAllFields();
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
index 14d717e..54e5786 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
@@ -55,6 +55,11 @@ public class Tuple implements ITuple {
}
@Override
+ public Object clone() {
+ return makeCopy();
+ }
+
+ @Override
public ITuple makeCopy() {
Tuple ret = new Tuple(this.info);
for (int i = 0; i < this.values.length; ++i) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 90a2e43..acb4960 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -104,6 +104,10 @@ public class StorageContext {
return this.enableLimit;
}
+ public int getStoragePushDownLimit() {
+ return this.isLimitEnabled() ? this.getOffset() + this.getLimit() : Integer.MAX_VALUE;
+ }
+
public void markSort() {
this.hasSort = true;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 9ca53f9..83ee6c7 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -69,10 +69,10 @@ public class CubeSegmentScanner implements IGTScanner {
}
scanRequest = scanRangePlanner.planScanRequest();
if (scanRequest != null) {
- scanRequest.setAllowPreAggregation(context.isNeedStorageAggregation());
- scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
- if (context.isLimitEnabled())
- scanRequest.setRowLimit(context.getLimit());
+ scanRequest.setAllowStorageAggregation(context.isNeedStorageAggregation());
+ scanRequest.setAggCacheMemThreshold(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+ scanRequest.setStorageScanRowNumThreshold(context.getThreshold());//TODO: devide by shard number?
+ scanRequest.setStoragePushDownLimit(context.getStoragePushDownLimit());
}
scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 68556d6..0f96e3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -18,10 +18,11 @@
package org.apache.kylin.storage.gtrecord;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
+import java.util.Map.Entry;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.Dictionary;
@@ -36,9 +37,11 @@ import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -62,8 +65,10 @@ public class CubeTupleConverter {
final int nSelectedDims;
+ final int[] dimensionIndexOnTuple;
+
public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
- Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+ Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
this.tupleInfo = returnTupleInfo;
@@ -83,6 +88,20 @@ public class CubeTupleConverter {
advMeasureFillers = Lists.newArrayListWithCapacity(1);
advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
+ // dimensionIndexOnTuple is for SQL with limit
+ List<Integer> temp = Lists.newArrayList();
+ for (TblColRef dim : cuboid.getColumns()) {
+ if (tupleInfo.hasColumn(dim)) {
+ temp.add(tupleInfo.getColumnIndex(dim));
+ }
+ }
+ dimensionIndexOnTuple = new int[temp.size()];
+ for (int i = 0; i < temp.size(); i++) {
+ dimensionIndexOnTuple[i] = temp.get(i);
+ }
+
+ ////////////
+
int iii = 0;
// pre-calculate dimension index mapping to tuple
@@ -90,6 +109,11 @@ public class CubeTupleConverter {
int i = mapping.getIndexOf(dim);
gtColIdx[iii] = i;
tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
+
+ // if (tupleIdx[iii] == -1) {
+ // throw new IllegalStateException("dim not used in tuple:" + dim);
+ // }
+
iii++;
}
@@ -131,6 +155,44 @@ public class CubeTupleConverter {
}
}
+ public Comparator<ITuple> getTupleDimensionComparator() {
+ return new Comparator<ITuple>() {
+ @Override
+ public int compare(ITuple o1, ITuple o2) {
+ Preconditions.checkNotNull(o1);
+ Preconditions.checkNotNull(o2);
+ for (int i = 0; i < dimensionIndexOnTuple.length; i++) {
+ int index = dimensionIndexOnTuple[i];
+
+ if (index == -1) {
+ //TODO:
+ continue;
+ }
+
+ Comparable a = (Comparable) o1.getAllValues()[index];
+ Comparable b = (Comparable) o2.getAllValues()[index];
+
+ if (a == null && b == null) {
+ continue;
+ } else if (a == null) {
+ return 1;
+ } else if (b == null) {
+ return -1;
+ } else {
+ int temp = a.compareTo(b);
+ if (temp != 0) {
+ return temp;
+ } else {
+ continue;
+ }
+ }
+ }
+
+ return 0;
+ }
+ };
+ }
+
// load only needed dictionaries
private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
@@ -201,55 +263,55 @@ public class CubeTupleConverter {
return null;
switch (deriveInfo.type) {
- case LOOKUP:
- return new IDerivedColumnFiller() {
- CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
- LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
- int[] derivedColIdx = initDerivedColIdx();
- Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
-
- private int[] initDerivedColIdx() {
- int[] idx = new int[deriveInfo.columns.length];
- for (int i = 0; i < idx.length; i++) {
- idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+ case LOOKUP:
+ return new IDerivedColumnFiller() {
+ CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
+ LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
+ int[] derivedColIdx = initDerivedColIdx();
+ Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
+
+ private int[] initDerivedColIdx() {
+ int[] idx = new int[deriveInfo.columns.length];
+ for (int i = 0; i < idx.length; i++) {
+ idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+ }
+ return idx;
}
- return idx;
- }
- @Override
- public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
- for (int i = 0; i < hostTmpIdx.length; i++) {
- lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
- }
+ @Override
+ public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+ for (int i = 0; i < hostTmpIdx.length; i++) {
+ lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
+ }
- String[] lookupRow = lookupTable.getRow(lookupKey);
+ String[] lookupRow = lookupTable.getRow(lookupKey);
- if (lookupRow != null) {
- for (int i = 0; i < derivedTupleIdx.length; i++) {
- if (derivedTupleIdx[i] >= 0) {
- String value = lookupRow[derivedColIdx[i]];
- tuple.setDimensionValue(derivedTupleIdx[i], value);
+ if (lookupRow != null) {
+ for (int i = 0; i < derivedTupleIdx.length; i++) {
+ if (derivedTupleIdx[i] >= 0) {
+ String value = lookupRow[derivedColIdx[i]];
+ tuple.setDimensionValue(derivedTupleIdx[i], value);
+ }
}
- }
- } else {
- for (int i = 0; i < derivedTupleIdx.length; i++) {
- if (derivedTupleIdx[i] >= 0) {
- tuple.setDimensionValue(derivedTupleIdx[i], null);
+ } else {
+ for (int i = 0; i < derivedTupleIdx.length; i++) {
+ if (derivedTupleIdx[i] >= 0) {
+ tuple.setDimensionValue(derivedTupleIdx[i], null);
+ }
}
}
}
- }
- };
- case PK_FK:
- return new IDerivedColumnFiller() {
- @Override
- public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
- // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
- tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
- }
- };
- default:
- throw new IllegalArgumentException();
+ };
+ case PK_FK:
+ return new IDerivedColumnFiller() {
+ @Override
+ public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+ // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
+ tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
+ }
+ };
+ default:
+ throw new IllegalArgumentException();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
new file mode 100644
index 0000000..cb83819
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Iterator;
+
+interface FetchSourceAwareIterator<F> extends IFetchSourceAware<F>, Iterator<F> {
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 7acf186..ae5240b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -101,13 +101,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
boolean exactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
context.setExactAggregation(exactAggregation);
- context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation));
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
TupleFilter filterD = translateDerived(filter, groupsD);
+ context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation));
+ enableStoragePushDownLimit(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
- setLimit(filter, context);
List<CubeSegmentScanner> scanners = Lists.newArrayList();
for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
@@ -227,6 +227,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
return !isExactAggregation;
}
+ //exact aggregation was introduced back when we had some measures (like holistic distinct count) that is sensitive
+ //to post aggregation. Now that we don't have such measure any more, isExactAggregation should be useless (at least in v2 storage and above)
public boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
boolean exact = true;
@@ -372,11 +374,44 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
- private void setLimit(TupleFilter filter, StorageContext context) {
- boolean goodAggr = context.isExactAggregation();
+ private void enableStoragePushDownLimit(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
+ boolean possible = true;
+
boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
- boolean goodSort = context.hasSort() == false;
- if (goodAggr && goodFilter && goodSort) {
+ if (!goodFilter) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because the filter is unevaluatable");
+ }
+
+ boolean goodSort = !context.hasSort();
+ if (!goodSort) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because the query has order by");
+ }
+
+ // derived aggregation is bad, unless expanded columns are already in group by
+ if (!groups.containsAll(derivedPostAggregation)) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because derived column require post aggregation: " + derivedPostAggregation);
+ }
+
+ //if groupsD is clustered at "head" of the rowkey, then limit push down is possible
+ int size = groupsD.size();
+ if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because groupD is not clustered at head, groupsD: " + groupsD //
+ + " with cuboid columns: " + cuboid.getColumns());
+ }
+
+ //if exists measures like max(cal_dt), then it's not a perfect cuboid match, cannot apply limit
+ for (FunctionDesc functionDesc : functionDescs) {
+ if (functionDesc.isDimensionAsMetric()) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because {} isDimensionAsMetric ", functionDesc);
+ }
+ }
+
+ if (possible) {
logger.info("Enable limit " + context.getLimit());
context.enableLimit();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
new file mode 100644
index 0000000..d51ca4c
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Iterator;
+
+public interface IFetchSourceAware<E> {
+ public Iterator<? extends E> getFetchSource();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
new file mode 100644
index 0000000..96a232f
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Iterator;
+
+import com.google.common.collect.PeekingIterator;
+
+/**
+ * copied from guava, change iterator access modifier to public
+ *
+ * Implementation of PeekingIterator that avoids peeking unless necessary.
+ */
+class PeekingImpl<E> implements PeekingIterator<E> {
+
+ public final Iterator<? extends E> iterator;
+ private boolean hasPeeked;
+ private E peekedElement;
+
+ public PeekingImpl(Iterator<? extends E> iterator) {
+ this.iterator = checkNotNull(iterator);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return hasPeeked || iterator.hasNext();
+ }
+
+ @Override
+ public E next() {
+ if (!hasPeeked) {
+ return iterator.next();
+ }
+ E result = peekedElement;
+ hasPeeked = false;
+ peekedElement = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ checkState(!hasPeeked, "Can't remove after you've peeked at next");
+ iterator.remove();
+ }
+
+ @Override
+ public E peek() {
+ if (!hasPeeked) {
+ peekedElement = iterator.next();
+ hasPeeked = true;
+ }
+ return peekedElement;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
new file mode 100644
index 0000000..61267ae
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.StorageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SegmentCubeTupleIterator implements ITupleIterator {
+
+ private static final Logger logger = LoggerFactory.getLogger(SegmentCubeTupleIterator.class);
+
+ protected final CubeSegmentScanner scanner;
+ protected final Cuboid cuboid;
+ protected final Set<TblColRef> selectedDimensions;
+ protected final Set<FunctionDesc> selectedMetrics;
+ protected final TupleInfo tupleInfo;
+ protected final Tuple tuple;
+ protected final StorageContext context;
+
+ protected Iterator<GTRecord> gtItr;
+ protected CubeTupleConverter cubeTupleConverter;
+ protected Tuple next;
+
+ private List<IAdvMeasureFiller> advMeasureFillers;
+ private int advMeasureRowsRemaining;
+ private int advMeasureRowIndex;
+
+ public SegmentCubeTupleIterator(CubeSegmentScanner scanner, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+ Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
+ this.scanner = scanner;
+ this.cuboid = cuboid;
+ this.selectedDimensions = selectedDimensions;
+ this.selectedMetrics = selectedMetrics;
+ this.tupleInfo = returnTupleInfo;
+ this.tuple = new Tuple(returnTupleInfo);
+ this.context = context;
+ this.gtItr = getGTItr(scanner);
+ this.cubeTupleConverter = new CubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+ }
+
+ private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) {
+ return scanner.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ // consume any left rows from advanced measure filler
+ if (advMeasureRowsRemaining > 0) {
+ for (IAdvMeasureFiller filler : advMeasureFillers) {
+ filler.fillTuple(tuple, advMeasureRowIndex);
+ }
+ advMeasureRowIndex++;
+ advMeasureRowsRemaining--;
+ next = tuple;
+ return true;
+ }
+
+ // now we have a GTRecord
+ if (!gtItr.hasNext()) {
+ return false;
+ }
+ GTRecord curRecord = gtItr.next();
+
+ Preconditions.checkNotNull(cubeTupleConverter);
+
+ // translate into tuple
+ advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple);
+
+ // the simple case
+ if (advMeasureFillers == null) {
+ next = tuple;
+ return true;
+ }
+
+ // advanced measure filling, like TopN, will produce multiple tuples out of one record
+ advMeasureRowsRemaining = -1;
+ for (IAdvMeasureFiller filler : advMeasureFillers) {
+ if (advMeasureRowsRemaining < 0)
+ advMeasureRowsRemaining = filler.getNumOfRows();
+ if (advMeasureRowsRemaining != filler.getNumOfRows())
+ throw new IllegalStateException();
+ }
+ if (advMeasureRowsRemaining < 0)
+ throw new IllegalStateException();
+
+ advMeasureRowIndex = 0;
+ return hasNext();
+ }
+
+ @Override
+ public ITuple next() {
+ // fetch next record
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+
+ ITuple result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ close(scanner);
+ }
+
+ protected void close(CubeSegmentScanner scanner) {
+ try {
+ scanner.close();
+ } catch (IOException e) {
+ logger.error("Exception when close CubeScanner", e);
+ }
+ }
+
+ public CubeTupleConverter getCubeTupleConverter() {
+ return cubeTupleConverter;
+ }
+}