You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/31 08:39:11 UTC
[1/5] kylin git commit: minor,
set default cubing algorithm to layer for stableness [Forced Update!]
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2501 d045a045a -> 782a97482 (forced update)
minor, set default cubing algorithm to layer for stableness
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a776ef67
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a776ef67
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a776ef67
Branch: refs/heads/KYLIN-2501
Commit: a776ef6747638ab757c9a5298b500dd18756f5b5
Parents: 4c21821
Author: Li Yang <li...@apache.org>
Authored: Wed Mar 29 11:51:22 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Wed Mar 29 11:57:24 2017 +0800
----------------------------------------------------------------------
build/conf/kylin.properties | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/a776ef67/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 1a55c94..d39f7ee 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -144,7 +144,7 @@ kylin.engine.mr.uhc-reducer-count=1
### CUBE | DICTIONARY ###
# 'auto', 'inmem' or 'layer'
-kylin.cube.algorithm=auto
+kylin.cube.algorithm=layer
# A smaller threshold prefers layer, a larger threshold prefers in-mem
kylin.cube.algorithm.layer-or-inmem-threshold=7
[4/5] kylin git commit: KYLIN-2501 Stream Aggregate GTRecords at
Query Server
Posted by li...@apache.org.
KYLIN-2501 Stream Aggregate GTRecords at Query Server
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0fa57248
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0fa57248
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0fa57248
Branch: refs/heads/KYLIN-2501
Commit: 0fa572482f209d25cca6968812d293c08d077210
Parents: fa3ee3f
Author: gaodayue <ga...@meituan.com>
Authored: Wed Mar 15 22:45:02 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 31 16:39:35 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 +
.../kylin/common/util/ImmutableBitSet.java | 29 ++-
.../org/apache/kylin/GTForwardingScanner.java | 56 +++++
.../kylin/cube/gridtable/CubeGridTable.java | 18 --
.../gridtable/CuboidToGridTableMapping.java | 18 ++
.../cube/inmemcubing/InMemCubeBuilder.java | 6 +-
.../kylin/gridtable/GTAggregateScanner.java | 16 +-
.../apache/kylin/gridtable/GTFilterScanner.java | 22 +-
.../org/apache/kylin/gridtable/GTRecord.java | 80 +++----
.../apache/kylin/gridtable/GTScanRequest.java | 13 ++
.../gridtable/GTStreamAggregateScanner.java | 211 +++++++++++++++++++
.../kylin/gridtable/GTScanReqSerDerTest.java | 4 +-
.../apache/kylin/storage/StorageContext.java | 20 ++
.../storage/gtrecord/CubeScanRangePlanner.java | 3 +-
.../storage/gtrecord/CubeSegmentScanner.java | 7 +-
.../storage/gtrecord/CubeTupleConverter.java | 31 +--
.../gtrecord/GTCubeStorageQueryBase.java | 38 +++-
.../kylin/storage/gtrecord/ITupleConverter.java | 3 +-
.../gtrecord/PartitionResultIterator.java | 59 ++++++
.../storage/gtrecord/PartitionResultMerger.java | 100 +++++++++
.../kylin/storage/gtrecord/ScannerWorker.java | 5 +-
.../gtrecord/SegmentCubeTupleIterator.java | 72 ++++++-
.../gtrecord/StorageResponseGTScatter.java | 82 +++----
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 7 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 5 +-
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 5 +-
26 files changed, 704 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 02349ad..9cd35c8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -802,6 +802,10 @@ abstract public class KylinConfigBase implements Serializable {
return Boolean.valueOf(getOptional("kylin.query.skip-empty-segments", "true"));
}
+ public boolean isStreamAggregateEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true"));
+ }
+
@Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold
public int getStoragePushDownLimitMax() {
return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000"));
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
index b417877..5cdf08c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -19,8 +19,9 @@ package org.apache.kylin.common.util;
import java.nio.ByteBuffer;
import java.util.BitSet;
+import java.util.Iterator;
-public class ImmutableBitSet {
+public class ImmutableBitSet implements Iterable<Integer> {
public static final ImmutableBitSet EMPTY = new ImmutableBitSet(new BitSet());
@@ -168,4 +169,30 @@ public class ImmutableBitSet {
return new ImmutableBitSet(bitSet);
}
};
+
+ /**
+ * Iterate over the positions of true value.
+ * @return the iterator
+ */
+ @Override
+ public Iterator<Integer> iterator() {
+ return new Iterator<Integer>() {
+ int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return index < arr.length;
+ }
+
+ @Override
+ public Integer next() {
+ return arr[index++];
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
new file mode 100644
index 0000000..de8c88d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin;
+
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link IGTScanner} which forwards all its method calls to another scanner.
+ *
+ * @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>.
+ */
+public class GTForwardingScanner implements IGTScanner {
+ protected IGTScanner delegated;
+
+ protected GTForwardingScanner(IGTScanner delegated) {
+ this.delegated = checkNotNull(delegated, "delegated");
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return delegated.getInfo();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegated.close();
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return delegated.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
index 563cf43..5cee9df 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -18,29 +18,11 @@
package org.apache.kylin.cube.gridtable;
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.CubeDimEncMap;
-import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dimension.IDimensionEncodingMap;
import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.metadata.model.TblColRef;
public class CubeGridTable {
-
- public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) {
- Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId);
- return newGTInfo(cuboid, new CubeDimEncMap(cubeSeg));
- }
-
- public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<String>> dictionaryMap) {
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
- return newGTInfo(cuboid, new CubeDimEncMap(cubeDesc, dictionaryMap));
- }
-
public static GTInfo newGTInfo(Cuboid cuboid, IDimensionEncodingMap dimEncMap) {
CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index 2e5dd12..6879687 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -140,11 +140,29 @@ public class CuboidToGridTableMapping {
return i == null ? -1 : i.intValue();
}
+ public int[] getDimIndexes(Collection<TblColRef> dims) {
+ int[] result = new int[dims.size()];
+ int i = 0;
+ for (TblColRef dim : dims) {
+ result[i++] = getIndexOf(dim);
+ }
+ return result;
+ }
+
public int getIndexOf(FunctionDesc metric) {
Integer r = metrics2gt.get(metric);
return r == null ? -1 : r;
}
+ public int[] getMetricsIndexes(Collection<FunctionDesc> metrics) {
+ int[] result = new int[metrics.size()];
+ int i = 0;
+ for (FunctionDesc metric : metrics) {
+ result[i++] = getIndexOf(metric);
+ }
+ return result;
+ }
+
public List<TblColRef> getCuboidDimensionsInGTOrder() {
return cuboid.getColumns();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e08844e..a26e948 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -38,6 +38,7 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTAggregateScanner;
import org.apache.kylin.gridtable.GTBuilder;
@@ -108,7 +109,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
- GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
+ GTInfo info = CubeGridTable.newGTInfo(
+ Cuboid.findById(cubeDesc, cuboidID),
+ new CubeDimEncMap(cubeDesc, dictionaryMap)
+ );
// Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
// MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 7cdd4f5..0dd6fa9 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -45,7 +45,6 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.metadata.datatype.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +62,7 @@ public class GTAggregateScanner implements IGTScanner {
final ImmutableBitSet metrics;
final String[] metricsAggrFuncs;
final IGTScanner inputScanner;
+ final BufferedMeasureCodec measureCodec;
final AggregationCache aggrCache;
final long spillThreshold; // 0 means no memory control && no spill
final int storagePushDownLimit;//default to be Int.MAX
@@ -86,6 +86,7 @@ public class GTAggregateScanner implements IGTScanner {
this.metrics = req.getAggrMetrics();
this.metricsAggrFuncs = req.getAggrMetricsFuncs();
this.inputScanner = inputScanner;
+ this.measureCodec = req.createMeasureCodec();
this.aggrCache = new AggregationCache();
this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
this.aggrMask = new boolean[metricsAggrFuncs.length];
@@ -175,7 +176,6 @@ public class GTAggregateScanner implements IGTScanner {
final int keyLength;
final boolean[] compareMask;
boolean compareAll = true;
- final BufferedMeasureCodec measureCodec;
final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
@Override
@@ -213,18 +213,6 @@ public class GTAggregateScanner implements IGTScanner {
keyLength = compareMask.length;
dumps = Lists.newArrayList();
aggBufMap = createBuffMap();
- measureCodec = createMeasureCodec();
- }
-
- private BufferedMeasureCodec createMeasureCodec() {
- DataType[] types = new DataType[metrics.trueBitCount()];
- for (int i = 0; i < types.length; i++) {
- types[i] = info.getColumnType(metrics.trueBitAt(i));
- }
-
- BufferedMeasureCodec result = new BufferedMeasureCodec(types);
- result.setBufferSize(info.getMaxColumnLength(metrics));
- return result;
}
private boolean[] createCompareMask() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
index 717f89c..cad0a04 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
+import org.apache.kylin.GTForwardingScanner;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -33,17 +34,16 @@ import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
-public class GTFilterScanner implements IGTScanner {
+public class GTFilterScanner extends GTForwardingScanner {
- final private IGTScanner inputScanner;
final private TupleFilter filter;
final private IFilterCodeSystem<ByteArray> filterCodeSystem;
final private IEvaluatableTuple oneTuple; // avoid instance creation
private GTRecord next = null;
- public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
- this.inputScanner = inputScanner;
+ public GTFilterScanner(IGTScanner delegated, GTScanRequest req) throws IOException {
+ super(delegated);
this.filter = req.getFilterPushDown();
this.filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator());
this.oneTuple = new IEvaluatableTuple() {
@@ -53,25 +53,15 @@ public class GTFilterScanner implements IGTScanner {
}
};
- if (TupleFilter.isEvaluableRecursively(filter) == false)
+ if (!TupleFilter.isEvaluableRecursively(filter))
throw new IllegalArgumentException();
}
@Override
- public GTInfo getInfo() {
- return inputScanner.getInfo();
- }
-
- @Override
- public void close() throws IOException {
- inputScanner.close();
- }
-
- @Override
public Iterator<GTRecord> iterator() {
return new Iterator<GTRecord>() {
- private Iterator<GTRecord> inputIterator = inputScanner.iterator();
+ private Iterator<GTRecord> inputIterator = delegated.iterator();
private FilterResultCache resultCache = new FilterResultCache(getInfo(), filter);
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index f4480c8..3397adc 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -21,7 +21,6 @@ package org.apache.kylin.gridtable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.List;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -46,18 +45,21 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
}
this.info = info;
}
-
- public GTRecord(GTRecord other) {
- this.info = other.info;
- this.cols = new ByteArray[info.getColumnCount()];
- for (int i = 0; i < other.cols.length; i++) {
- this.cols[i] = other.cols[i].copy();
+
+ @Override
+ public GTRecord clone() { // deep copy
+ ByteArray[] cols = new ByteArray[this.cols.length];
+ for (int i = 0; i < cols.length; i++) {
+ cols[i] = this.cols[i].copy();
}
+ return new GTRecord(this.info, cols);
}
- @Override
- public Object clone() {
- return new GTRecord(this);
+ public void shallowCopyFrom(GTRecord source) {
+ assert info == source.info;
+ for (int i = 0; i < cols.length; i++) {
+ cols[i].set(source.cols[i]);
+ }
}
public GTInfo getInfo() {
@@ -106,30 +108,18 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
/** decode and return the values of this record */
public Object[] getValues(ImmutableBitSet selectedCols, Object[] result) {
assert selectedCols.cardinality() == result.length;
-
for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- if (cols[c] == null || cols[c].array() == null) {
- result[i] = null;
- } else {
- result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
- }
+ result[i] = decodeValue(selectedCols.trueBitAt(i));
}
return result;
}
- /** decode and return the values of this record */
- public Object[] getValues(int[] selectedColumns, Object[] result) {
- assert selectedColumns.length <= result.length;
- for (int i = 0; i < selectedColumns.length; i++) {
- int c = selectedColumns[i];
- if (cols[c].array() == null) {
- result[i] = null;
- } else {
- result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
- }
+ public Object decodeValue(int c) {
+ ByteArray col = cols[c];
+ if (col != null && col.array() != null) {
+ return info.codeSystem.decodeColumnValue(c, col.asBuffer());
}
- return result;
+ return null;
}
public int sizeOf(ImmutableBitSet selectedCols) {
@@ -198,19 +188,13 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
return compareToInternal(o, info.colAll);
}
- public int compareToOnPrimaryKey(GTRecord o) {
- return compareToInternal(o, info.primaryKey);
- }
-
- public static Comparator<GTRecord> getPrimaryKeyComparator() {
+ public static Comparator<GTRecord> getComparator(final ImmutableBitSet participateCols) {
return new Comparator<GTRecord>() {
- @Override
public int compare(GTRecord o1, GTRecord o2) {
if (o1 == null || o2 == null) {
throw new IllegalStateException("Cannot handle null");
}
-
- return o1.compareToOnPrimaryKey(o2);
+ return o1.compareToInternal(o2, participateCols);
}
};
}
@@ -287,26 +271,14 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
loadColumns(info.colBlocks[c], buf);
}
- /** change pointers to point to data in given buffer, UNLIKE deserialize */
- public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
- int pos = buf.position();
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
- int len = info.codeSystem.codeLength(c, buf);
- cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
- pos += len;
- buf.position(pos);
- }
- }
-
- /** change pointers to point to data in given buffer, UNLIKE deserialize
- * unlike loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf), this
- * method allows to defined specific columns(in order) to load
+ /**
+ * Change pointers to point to data in given buffer, UNLIKE deserialize
+ * @param selectedCols positions of column to load
+ * @param buf buffer containing continuous data of selected columns
*/
- public void loadColumns(List<Integer> selectedCols, ByteBuffer buf) {
+ public void loadColumns(Iterable<Integer> selectedCols, ByteBuffer buf) {
int pos = buf.position();
- for (int i = 0; i < selectedCols.size(); i++) {
- int c = selectedCols.get(i);
+ for (int c : selectedCols) {
int len = info.codeSystem.codeLength(c, buf);
cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
pos += len;
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 4629c8e..ae35d2b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -31,6 +31,8 @@ import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.SerializeToByteBuffer;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -202,6 +204,17 @@ public class GTScanRequest {
}
+ public BufferedMeasureCodec createMeasureCodec() {
+ DataType[] metricTypes = new DataType[aggrMetrics.trueBitCount()];
+ for (int i = 0; i < metricTypes.length; i++) {
+ metricTypes[i] = info.getColumnType(aggrMetrics.trueBitAt(i));
+ }
+
+ BufferedMeasureCodec codec = new BufferedMeasureCodec(metricTypes);
+ codec.setBufferSize(info.getMaxColumnLength(aggrMetrics));
+ return codec;
+ }
+
public boolean isDoingStorageAggregation() {
return doingStorageAggregation;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
new file mode 100644
index 0000000..1fde423
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.kylin.GTForwardingScanner;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregator;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * GTStreamAggregateScanner requires input records to be sorted on group fields.
+ * In such cases, it's superior to hash/sort based aggregator because it can produce
+ * ordered outputs on the fly and the memory consumption is very low.
+ */
+public class GTStreamAggregateScanner extends GTForwardingScanner {
+ private final GTScanRequest req;
+ private final Comparator<GTRecord> keyComparator;
+
+ public GTStreamAggregateScanner(IGTScanner delegated,
+ GTScanRequest req, Comparator<GTRecord> keyComparator) {
+ super(delegated);
+ this.req = req;
+ this.keyComparator = keyComparator;
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new StreamMergeGTRecordIterator(delegated.iterator());
+ }
+
+ public Iterator<Object[]> valuesIterator(int[] gtDimsIdx, int[] gtMetricsIdx) {
+ return new StreamMergeValuesIterator(delegated.iterator(), gtDimsIdx, gtMetricsIdx);
+ }
+
+ private abstract class AbstractStreamMergeIterator<E> implements Iterator<E> {
+ final PeekingIterator<GTRecord> input;
+ final IGTCodeSystem codeSystem;
+ final ImmutableBitSet dimensions;
+ final ImmutableBitSet metrics;
+ final String[] metricFuncs;
+ final BufferedMeasureCodec measureCodec;
+
+ private final GTRecord first; // reuse to avoid object creation
+
+ AbstractStreamMergeIterator(Iterator<GTRecord> input) {
+ this.input = Iterators.peekingIterator(input);
+ this.codeSystem = req.getInfo().getCodeSystem();
+ this.dimensions = req.getDimensions();
+ this.metrics = req.getAggrMetrics();
+ this.metricFuncs = req.getAggrMetricsFuncs();
+ this.measureCodec = req.createMeasureCodec();
+
+ this.first = new GTRecord(req.getInfo());
+ }
+
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ private boolean isSameKey(GTRecord o1, GTRecord o2) {
+ return keyComparator.compare(o1, o2) == 0;
+ }
+
+ private boolean shouldMergeNext(GTRecord current) {
+ return input.hasNext() && isSameKey(current, input.peek());
+ }
+
+ protected abstract E finalizeResult(GTRecord record);
+
+ protected abstract E finalizeResult(GTRecord record, Object[] aggStates);
+
+ @Override
+ public E next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // WATCH OUT! record returned by "input" scanner could be changed later,
+ // so we must make a shallow copy of it.
+ first.shallowCopyFrom(input.next());
+
+ // shortcut to avoid extra deserialize/serialize cost
+ if (!shouldMergeNext(first)) {
+ return finalizeResult(first);
+ }
+ // merge records with the same key
+ MeasureAggregator[] aggrs = codeSystem.newMetricsAggregators(metrics, metricFuncs);
+ aggregate(aggrs, first);
+ aggregate(aggrs, input.next()); // no need to copy record because it's not referred to later
+ while (shouldMergeNext(first)) {
+ aggregate(aggrs, input.next());
+ }
+
+ Object[] aggStates = new Object[aggrs.length];
+ for (int i = 0; i < aggStates.length; i++) {
+ aggStates[i] = aggrs[i].getState();
+ }
+ return finalizeResult(first, aggStates);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void aggregate(MeasureAggregator[] aggregators, GTRecord record) {
+ for (int i = 0; i < aggregators.length; i++) {
+ int c = metrics.trueBitAt(i);
+ Object metric = codeSystem.decodeColumnValue(c, record.cols[c].asBuffer());
+ aggregators[i].aggregate(metric);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+ }
+
+ private class StreamMergeGTRecordIterator extends AbstractStreamMergeIterator<GTRecord> {
+
+ private GTRecord returnRecord; // avoid object creation
+
+ StreamMergeGTRecordIterator(Iterator<GTRecord> input) {
+ super(input);
+ this.returnRecord = new GTRecord(req.getInfo());
+ }
+
+ @Override
+ protected GTRecord finalizeResult(GTRecord record) {
+ return record;
+ }
+
+ @Override
+ protected GTRecord finalizeResult(GTRecord record, Object[] aggStates) {
+ // 1. load dimensions
+ for (int c : dimensions) {
+ returnRecord.cols[c] = record.cols[c];
+ }
+ // 2. serialize metrics
+ byte[] bytes = measureCodec.encode(aggStates).array();
+ int[] sizes = measureCodec.getMeasureSizes();
+ // 3. load metrics
+ int offset = 0;
+ for (int i = 0; i < metrics.trueBitCount(); i++) {
+ int c = metrics.trueBitAt(i);
+ returnRecord.cols[c].set(bytes, offset, sizes[i]);
+ offset += sizes[i];
+ }
+ return returnRecord;
+ }
+ }
+
+ private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> {
+
+ private int[] gtDimsIdx;
+ private int[] gtMetricsIdx;
+ private Object[] result; // avoid object creation
+
+ StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) {
+ super(input);
+ this.gtDimsIdx = gtDimsIdx;
+ this.gtMetricsIdx = gtMetricsIdx;
+ result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+ }
+
+ private void decodeAndSetDimensions(GTRecord record) {
+ for (int i = 0; i < gtDimsIdx.length; i++) {
+ result[i] = record.decodeValue(gtDimsIdx[i]);
+ }
+ }
+
+ @Override
+ protected Object[] finalizeResult(GTRecord record) {
+ decodeAndSetDimensions(record);
+ // decode metrics
+ for (int i = 0; i < gtMetricsIdx.length; i++) {
+ result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]);
+ }
+ return result;
+ }
+
+ @Override
+ protected Object[] finalizeResult(GTRecord record, Object[] aggStates) {
+ decodeAndSetDimensions(record);
+ // set metrics
+ for (int i = 0; i < gtMetricsIdx.length; i++) {
+ result[gtDimsIdx.length + i] = aggStates[i];
+ }
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
index 77cc2d8..1ae229a 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
@@ -29,6 +29,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -90,7 +91,8 @@ public class GTScanReqSerDerTest extends LocalFileMetadataTestCase {
CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("test_kylin_cube_with_slr_ready");
CubeSegment segment = cube.getFirstSegment();
- GTInfo info = CubeGridTable.newGTInfo(segment, Cuboid.getBaseCuboidId(cube.getDescriptor()));
+ Cuboid baseCuboid = Cuboid.getBaseCuboid(cube.getDescriptor());
+ GTInfo info = CubeGridTable.newGTInfo(baseCuboid, new CubeDimEncMap(segment));
GTInfo.serializer.serialize(info, buffer);
buffer.flip();
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 998f1db..bb17054 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,10 +18,12 @@
package org.apache.kylin.storage;
+import java.util.Comparator;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
import org.slf4j.Logger;
@@ -48,6 +50,9 @@ public class StorageContext {
private boolean needStorageAggregation = false;
private boolean enableCoprocessor = false;
+ private boolean enableStreamAggregate = false;
+ private Comparator<GTRecord> groupKeyComparator;
+
private IStorageQuery storageQuery;
private AtomicLong processedRowCount = new AtomicLong();
private Cuboid cuboid;
@@ -230,4 +235,19 @@ public class StorageContext {
this.storageQuery = storageQuery;
}
+ public boolean isStreamAggregateEnabled() {
+ return enableStreamAggregate;
+ }
+
+ public void enableStreamAggregate() {
+ this.enableStreamAggregate = true;
+ }
+
+ public Comparator<GTRecord> getGroupKeyComparator() {
+ return groupKeyComparator;
+ }
+
+ public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) {
+ this.groupKeyComparator = groupKeyComparator;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 6911827..c3cc858 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -38,6 +38,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.gridtable.RecordComparators;
import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
@@ -85,7 +86,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
Set<TblColRef> filterDims = Sets.newHashSet();
TupleFilter.collectColumns(filter, filterDims);
- this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId());
+ this.gtInfo = CubeGridTable.newGTInfo(cuboid, new CubeDimEncMap(cubeSegment));
CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
IGTComparator comp = gtInfo.getCodeSystem().getComparator();
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 4f206d4..31a9f99 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -78,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner {
}
scanRequest = scanRangePlanner.planScanRequest();
String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
- scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
+ scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context);
}
@Override
@@ -96,8 +96,7 @@ public class CubeSegmentScanner implements IGTScanner {
return scanRequest == null ? null : scanRequest.getInfo();
}
- public CubeSegment getSegment() {
- return this.cubeSeg;
+ public GTScanRequest getScanRequest() {
+ return scanRequest;
}
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 280718f..b762e5c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -28,10 +28,8 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -43,7 +41,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
- * convert GTRecord to tuple
+ * Convert Object[] (decoded GTRecord) to tuple
*/
public class CubeTupleConverter implements ITupleConverter {
@@ -54,7 +52,6 @@ public class CubeTupleConverter implements ITupleConverter {
private final int[] gtColIdx;
private final int[] tupleIdx;
- private final Object[] gtValues;
private final MeasureType<?>[] measureTypes;
private final List<IAdvMeasureFiller> advMeasureFillers;
@@ -63,19 +60,16 @@ public class CubeTupleConverter implements ITupleConverter {
private final int nSelectedDims;
public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
- Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+ Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx,
+ TupleInfo returnTupleInfo) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
+ this.gtColIdx = gtColIdx;
this.tupleInfo = returnTupleInfo;
this.derivedColFillers = Lists.newArrayList();
- List<TblColRef> cuboidDims = cuboid.getColumns();
- CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-
nSelectedDims = selectedDimensions.size();
- gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
- gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()];
// measure types don't have this many, but aligned length make programming easier
measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()];
@@ -89,21 +83,11 @@ public class CubeTupleConverter implements ITupleConverter {
// pre-calculate dimension index mapping to tuple
for (TblColRef dim : selectedDimensions) {
- int dimIndex = mapping.getIndexOf(dim);
- gtColIdx[i] = dimIndex;
tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
-
- // if (tupleIdx[iii] == -1) {
- // throw new IllegalStateException("dim not used in tuple:" + dim);
- // }
-
i++;
}
for (FunctionDesc metric : selectedMetrics) {
- int metricIndex = mapping.getIndexOf(metric);
- gtColIdx[i] = metricIndex;
-
if (metric.needRewrite()) {
String rewriteFieldName = metric.getRewriteFieldName();
tupleIdx[i] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
@@ -126,7 +110,7 @@ public class CubeTupleConverter implements ITupleConverter {
}
// prepare derived columns and filler
- Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
+ Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboid.getColumns(), null);
for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
TblColRef[] hostCols = entry.getKey().data;
for (DeriveInfo deriveInfo : entry.getValue()) {
@@ -148,9 +132,8 @@ public class CubeTupleConverter implements ITupleConverter {
}
@Override
- public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) {
-
- record.getValues(gtColIdx, gtValues);
+ public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple) {
+ assert gtValues.length == gtColIdx.length;
// dimensions
for (int i = 0; i < nSelectedDims; i++) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index ecf1ad3..82590a2 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,15 +26,18 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.RawQueryLastHacker;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -120,6 +123,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
// set limit push down
enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context);
+ // set whether to aggregate results from multiple partitions
+ enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
// set query deadline
context.setDeadline(cubeInstance);
@@ -144,8 +149,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
protected abstract String getGTStorage();
- protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo tupleInfo) {
- return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+ protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx, TupleInfo tupleInfo) {
+ return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo);
}
protected void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
@@ -366,6 +371,35 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
+ private void enableStreamAggregateIfBeneficial(Cuboid cuboid, Set<TblColRef> groupsD, StorageContext context) {
+ CubeDesc cubeDesc = cuboid.getCubeDesc();
+ boolean enabled = cubeDesc.getConfig().isStreamAggregateEnabled();
+
+ Set<TblColRef> shardByInGroups = Sets.newHashSet();
+ for (TblColRef col : cubeDesc.getShardByColumns()) {
+ if (groupsD.contains(col)) {
+ shardByInGroups.add(col);
+ }
+ }
+ if (!shardByInGroups.isEmpty()) {
+ enabled = false;
+ logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: " + shardByInGroups);
+ }
+
+ if (!context.isNeedStorageAggregation()) {
+ enabled = false;
+ logger.debug("Aggregate partition results is not beneficial because no storage aggregation");
+ }
+
+ if (enabled) {
+ CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+ ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD);
+
+ context.enableStreamAggregate();
+ context.setGroupKeyComparator(GTRecord.getComparator(cols));
+ }
+ }
+
protected void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
Map<String, List<MeasureDesc>> map = Maps.newHashMap();
for (MeasureDesc measure : cubeDesc.getMeasures()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
index 9c50d0c..dd48e4d 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
@@ -20,11 +20,10 @@ package org.apache.kylin.storage.gtrecord;
import java.util.List;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.tuple.Tuple;
public interface ITupleConverter {
- public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple);
+ public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
new file mode 100644
index 0000000..474e1e0
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+
+/**
+ * Support iterate over {@code GTRecord}s in storage partition result.
+ *
+ * <p>Note that the implementation returns the same object for next().
+ * Client needs to copy the returned record when needed.
+ */
+public class PartitionResultIterator extends UnmodifiableIterator<GTRecord> {
+ private final ByteBuffer buffer;
+ private final ImmutableBitSet cols;
+ private final GTRecord record; // reuse to avoid object creation
+
+ public PartitionResultIterator(byte[] data, GTInfo info, ImmutableBitSet cols) {
+ this.buffer = ByteBuffer.wrap(data);
+ this.cols = cols;
+ this.record = new GTRecord(info);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return buffer.hasRemaining();
+ }
+
+ @Override
+ public GTRecord next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ record.loadColumns(cols, buffer);
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
new file mode 100644
index 0000000..52029d3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
+ */
+public class PartitionResultMerger implements Iterable<GTRecord> {
+ private final ImmutableList<PartitionResultIterator> partitionResults;
+ private final GTInfo info;
+ private final Comparator<GTRecord> comparator;
+
+ public PartitionResultMerger(
+ Iterable<PartitionResultIterator> partitionResults,
+ GTInfo info, Comparator<GTRecord> comparator) {
+ this.partitionResults = ImmutableList.copyOf(partitionResults);
+ this.info = info;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ if (partitionResults.size() == 1) {
+ return partitionResults.get(0);
+ }
+ return new MergingResultsIterator();
+ }
+
+ private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> {
+ final GTRecord record = new GTRecord(info); // reuse to avoid object creation
+
+ PriorityQueue<PeekingIterator<GTRecord>> heap;
+
+ MergingResultsIterator() {
+ Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
+ public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
+ return comparator.compare(o1.peek(), o2.peek());
+ }
+ };
+ this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
+
+ for (PartitionResultIterator it : partitionResults) {
+ if (it.hasNext()) {
+ heap.offer(Iterators.peekingIterator(it));
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !heap.isEmpty();
+ }
+
+ @Override
+ public GTRecord next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // get smallest record
+ PeekingIterator<GTRecord> it = heap.poll();
+ // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
+ // so we must make a shallow copy of it.
+ record.shallowCopyFrom(it.next());
+
+ if (it.hasNext()) {
+ heap.offer(it);
+ }
+
+ return record;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
index 9e89227..fe22e9c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
@@ -30,6 +30,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStorage;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ public class ScannerWorker {
private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
private IGTScanner internal = null;
- public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) {
+ public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) {
if (scanRequest == null) {
logger.info("Segment {} will be skipped", segment);
internal = new EmptyGTScanner();
@@ -48,7 +49,7 @@ public class ScannerWorker {
final GTInfo info = scanRequest.getInfo();
try {
- IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior
+ IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior
internal = rpc.getGTScanner(scanRequest);
} catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 37699a3..11f766c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -24,8 +24,14 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
+import com.google.common.collect.UnmodifiableIterator;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTStreamAggregateScanner;
+import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -49,7 +55,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
protected final Tuple tuple;
protected final StorageContext context;
- protected Iterator<GTRecord> gtItr;
+ protected Iterator<Object[]> gtValues;
protected ITupleConverter cubeTupleConverter;
protected Tuple next;
@@ -66,12 +72,62 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
this.tupleInfo = returnTupleInfo;
this.tuple = new Tuple(returnTupleInfo);
this.context = context;
- this.gtItr = getGTItr(scanner);
- this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+
+ CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+ int[] gtDimsIdx = mapping.getDimIndexes(selectedDimensions);
+ int[] gtMetricsIdx = mapping.getMetricsIndexes(selectedMetrics);
+ // gtColIdx = gtDimsIdx + gtMetricsIdx
+ int[] gtColIdx = new int[gtDimsIdx.length + gtMetricsIdx.length];
+ System.arraycopy(gtDimsIdx, 0, gtColIdx, 0, gtDimsIdx.length);
+ System.arraycopy(gtMetricsIdx, 0, gtColIdx, gtDimsIdx.length, gtMetricsIdx.length);
+
+ this.gtValues = getGTValuesIterator(scanner.iterator(), scanner.getScanRequest(), gtDimsIdx, gtMetricsIdx);
+ this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(
+ scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo);
}
- private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) {
- return scanner.iterator();
+ private Iterator<Object[]> getGTValuesIterator(
+ final Iterator<GTRecord> records, final GTScanRequest scanRequest,
+ final int[] gtDimsIdx, final int[] gtMetricsIdx) {
+
+ boolean singlePartitionResult = records instanceof PartitionResultIterator;
+ if (context.isStreamAggregateEnabled() && !singlePartitionResult) {
+ // input records are ordered, leverage stream aggregator to produce possibly fewer records
+ IGTScanner inputScanner = new IGTScanner() {
+ public GTInfo getInfo() {
+ return scanRequest.getInfo();
+ }
+
+ public void close() throws IOException {}
+
+ public Iterator<GTRecord> iterator() {
+ return records;
+ }
+ };
+ GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(
+ inputScanner, scanRequest, context.getGroupKeyComparator());
+ return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx);
+ }
+
+ // simply decode records
+ return new UnmodifiableIterator<Object[]>() {
+ Object[] result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+
+ public boolean hasNext() {
+ return records.hasNext();
+ }
+
+ public Object[] next() {
+ GTRecord record = records.next();
+ for (int i = 0; i < gtDimsIdx.length; i++) {
+ result[i] = record.decodeValue(gtDimsIdx[i]);
+ }
+ for (int i = 0; i < gtMetricsIdx.length; i++) {
+ result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]);
+ }
+ return result;
+ }
+ };
}
@Override
@@ -91,13 +147,13 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
}
// now we have a GTRecord
- if (!gtItr.hasNext()) {
+ if (!gtValues.hasNext()) {
return false;
}
- GTRecord curRecord = gtItr.next();
+ Object[] gtValues = this.gtValues.next();
// translate into tuple
- advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple);
+ advMeasureFillers = cubeTupleConverter.translateResult(gtValues, tuple);
// the simple case
if (advMeasureFillers == null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 1a80bbf..0f1e191 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -18,22 +18,20 @@
package org.apache.kylin.storage.gtrecord;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import javax.annotation.Nullable;
-
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.StorageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
/**
* scatter the blob returned from region server to a iterable of gtrecords
@@ -42,18 +40,20 @@ public class StorageResponseGTScatter implements IGTScanner {
private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class);
- private GTInfo info;
+ private final GTInfo info;
private IPartitionStreamer partitionStreamer;
- private Iterator<byte[]> blocks;
- private ImmutableBitSet columns;
- private int storagePushDownLimit = -1;
+ private final Iterator<byte[]> blocks;
+ private final ImmutableBitSet columns;
+ private final StorageContext context;
+ private final boolean needSorted; // whether scanner should return sorted records
- public StorageResponseGTScatter(GTInfo info, IPartitionStreamer partitionStreamer, ImmutableBitSet columns, int storagePushDownLimit) {
- this.info = info;
+ public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) {
+ this.info = scanRequest.getInfo();
this.partitionStreamer = partitionStreamer;
this.blocks = partitionStreamer.asByteArrayIterator();
- this.columns = columns;
- this.storagePushDownLimit = storagePushDownLimit;
+ this.columns = scanRequest.getColumns();
+ this.context = context;
+ this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled();
}
@Override
@@ -69,48 +69,18 @@ public class StorageResponseGTScatter implements IGTScanner {
@Override
public Iterator<GTRecord> iterator() {
- Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc());
- if (StorageContext.mergeSortPartitionResults(storagePushDownLimit)) {
- logger.info("Using SortedIteratorMergerWithLimit to merge partition results");
- return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
- } else {
- logger.info("Using Iterators.concat to merge partition results");
- return Iterators.concat(shardSubsets);
+ List<PartitionResultIterator> partitionResults = Lists.newArrayList();
+ while (blocks.hasNext()) {
+ partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns));
}
- }
-
- class EndpointResponseGTScatterFunc implements Function<byte[], Iterator<GTRecord>> {
- @Nullable
- @Override
- public Iterator<GTRecord> apply(@Nullable final byte[] input) {
-
- return new Iterator<GTRecord>() {
- private ByteBuffer inputBuffer = null;
- //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
- private GTRecord firstRecord = null;
-
- @Override
- public boolean hasNext() {
- if (inputBuffer == null) {
- inputBuffer = ByteBuffer.wrap(input);
- firstRecord = new GTRecord(info);
- }
- return inputBuffer.position() < inputBuffer.limit();
- }
-
- @Override
- public GTRecord next() {
- firstRecord.loadColumns(columns, inputBuffer);
- return firstRecord;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ if (!needSorted) {
+ logger.debug("Using Iterators.concat to merge partition results");
+ return Iterators.concat(partitionResults.iterator());
}
- }
+ logger.debug("Using PartitionResultMerger to merge partition results");
+ PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator());
+ return merger.iterator();
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 82b67b6..e822ada 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -47,6 +47,7 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -69,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
private static ExecutorService executorService = new LoggableCachedThreadPool();
- public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
- super(segment, cuboid, fullGTInfo);
+ public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
+ super(segment, cuboid, fullGTInfo, context);
}
private byte[] getByteArrayForShort(short v) {
@@ -245,7 +246,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
});
}
- return new StorageResponseGTScatter(fullGTInfo, new DummyPartitionStreamer(epResultItr), scanRequest.getColumns(), scanRequest.getStoragePushDownLimit());
+ return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext);
}
private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 88e7176..db81646 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -49,6 +49,7 @@ import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.IGTStorage;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,17 +65,19 @@ public abstract class CubeHBaseRPC implements IGTStorage {
final protected Cuboid cuboid;
final protected GTInfo fullGTInfo;
final protected QueryContext queryContext;
+ final protected StorageContext storageContext;
final private RowKeyEncoder fuzzyKeyEncoder;
final private RowKeyEncoder fuzzyMaskEncoder;
- public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
+ public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment");
this.cubeSeg = (CubeSegment) segment;
this.cuboid = cuboid;
this.fullGTInfo = fullGTInfo;
this.queryContext = QueryContext.current();
+ this.storageContext = context;
this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
http://git-wip-us.apache.org/repos/asf/kylin/blob/0fa57248/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 33f8d90..951e2ef 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,8 +88,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
}
- public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) {
- super(segment, cuboid, fullGTInfo);
+ public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) {
+ super(segment, cuboid, fullGTInfo, context);
}
@Override
[5/5] kylin git commit: KYLIN-2501 bugfix, pass IT
Posted by li...@apache.org.
KYLIN-2501 bugfix, pass IT
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/782a9748
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/782a9748
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/782a9748
Branch: refs/heads/KYLIN-2501
Commit: 782a97482ca1e92cf24e04badd8ee48c9b829f46
Parents: 0fa5724
Author: gaodayue <ga...@meituan.com>
Authored: Wed Mar 22 16:31:45 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 31 16:39:35 2017 +0800
----------------------------------------------------------------------
.../gridtable/GTStreamAggregateScanner.java | 24 +++--
.../apache/kylin/storage/StorageContext.java | 12 ---
.../gtrecord/GTCubeStorageQueryBase.java | 7 --
.../storage/gtrecord/PartitionResultMerger.java | 100 -------------------
.../gtrecord/SegmentCubeTupleIterator.java | 7 +-
.../SortMergedPartitionResultIterator.java | 81 +++++++++++++++
.../gtrecord/StorageResponseGTScatter.java | 13 ++-
7 files changed, 108 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/782a9748/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
index 1fde423..4eb5791 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -18,6 +18,7 @@
package org.apache.kylin.gridtable;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.kylin.GTForwardingScanner;
@@ -38,11 +39,10 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
private final GTScanRequest req;
private final Comparator<GTRecord> keyComparator;
- public GTStreamAggregateScanner(IGTScanner delegated,
- GTScanRequest req, Comparator<GTRecord> keyComparator) {
+ public GTStreamAggregateScanner(IGTScanner delegated, GTScanRequest scanRequest) {
super(delegated);
- this.req = req;
- this.keyComparator = keyComparator;
+ this.req = Preconditions.checkNotNull(scanRequest, "scanRequest");
+ this.keyComparator = GTRecord.getComparator(scanRequest.getAggrGroupBy());
}
@Override
@@ -172,14 +172,22 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> {
private int[] gtDimsIdx;
- private int[] gtMetricsIdx;
+ private int[] gtMetricsIdx; // specify which metric to return and their order
+ private int[] aggIdx; // specify the ith returning metric's aggStates index
+
private Object[] result; // avoid object creation
StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) {
super(input);
this.gtDimsIdx = gtDimsIdx;
this.gtMetricsIdx = gtMetricsIdx;
- result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+ this.aggIdx = new int[gtMetricsIdx.length];
+ for (int i = 0; i < aggIdx.length; i++) {
+ int metricIdx = gtMetricsIdx[i];
+ aggIdx[i] = metrics.trueBitIndexOf(metricIdx);
+ }
+
+ this.result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
}
private void decodeAndSetDimensions(GTRecord record) {
@@ -202,8 +210,8 @@ public class GTStreamAggregateScanner extends GTForwardingScanner {
protected Object[] finalizeResult(GTRecord record, Object[] aggStates) {
decodeAndSetDimensions(record);
// set metrics
- for (int i = 0; i < gtMetricsIdx.length; i++) {
- result[gtDimsIdx.length + i] = aggStates[i];
+ for (int i = 0; i < aggIdx.length; i++) {
+ result[gtDimsIdx.length + i] = aggStates[aggIdx[i]];
}
return result;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/782a9748/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index bb17054..4522261 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,12 +18,10 @@
package org.apache.kylin.storage;
-import java.util.Comparator;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
import org.slf4j.Logger;
@@ -49,9 +47,7 @@ public class StorageContext {
private boolean exactAggregation = false;
private boolean needStorageAggregation = false;
private boolean enableCoprocessor = false;
-
private boolean enableStreamAggregate = false;
- private Comparator<GTRecord> groupKeyComparator;
private IStorageQuery storageQuery;
private AtomicLong processedRowCount = new AtomicLong();
@@ -242,12 +238,4 @@ public class StorageContext {
public void enableStreamAggregate() {
this.enableStreamAggregate = true;
}
-
- public Comparator<GTRecord> getGroupKeyComparator() {
- return groupKeyComparator;
- }
-
- public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) {
- this.groupKeyComparator = groupKeyComparator;
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/782a9748/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 82590a2..d91a0b4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,18 +26,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.RawQueryLastHacker;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -392,11 +389,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
if (enabled) {
- CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
- ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD);
-
context.enableStreamAggregate();
- context.setGroupKeyComparator(GTRecord.getComparator(cols));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/782a9748/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
deleted file mode 100644
index 52029d3..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.gtrecord;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
-
-/**
- * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
- */
-public class PartitionResultMerger implements Iterable<GTRecord> {
- private final ImmutableList<PartitionResultIterator> partitionResults;
- private final GTInfo info;
- private final Comparator<GTRecord> comparator;
-
- public PartitionResultMerger(
- Iterable<PartitionResultIterator> partitionResults,
- GTInfo info, Comparator<GTRecord> comparator) {
- this.partitionResults = ImmutableList.copyOf(partitionResults);
- this.info = info;
- this.comparator = comparator;
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- if (partitionResults.size() == 1) {
- return partitionResults.get(0);
- }
- return new MergingResultsIterator();
- }
-
- private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> {
- final GTRecord record = new GTRecord(info); // reuse to avoid object creation
-
- PriorityQueue<PeekingIterator<GTRecord>> heap;
-
- MergingResultsIterator() {
- Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
- public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
- return comparator.compare(o1.peek(), o2.peek());
- }
- };
- this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
-
- for (PartitionResultIterator it : partitionResults) {
- if (it.hasNext()) {
- heap.offer(Iterators.peekingIterator(it));
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return !heap.isEmpty();
- }
-
- @Override
- public GTRecord next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- // get smallest record
- PeekingIterator<GTRecord> it = heap.poll();
- // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
- // so we must make a shallow copy of it.
- record.shallowCopyFrom(it.next());
-
- if (it.hasNext()) {
- heap.offer(it);
- }
-
- return record;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/782a9748/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 11f766c..3bac5ec 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -90,8 +90,8 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
final Iterator<GTRecord> records, final GTScanRequest scanRequest,
final int[] gtDimsIdx, final int[] gtMetricsIdx) {
- boolean singlePartitionResult = records instanceof PartitionResultIterator;
- if (context.isStreamAggregateEnabled() && !singlePartitionResult) {
+ boolean hasMultiplePartitions = records instanceof SortMergedPartitionResultIterator;
+ if (hasMultiplePartitions && context.isStreamAggregateEnabled()) {
// input records are ordered, leverage stream aggregator to produce possibly fewer records
IGTScanner inputScanner = new IGTScanner() {
public GTInfo getInfo() {
@@ -104,8 +104,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
return records;
}
};
- GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(
- inputScanner, scanRequest, context.getGroupKeyComparator());
+ GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(inputScanner, scanRequest);
return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/782a9748/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
new file mode 100644
index 0000000..21e61e3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortMergedPartitionResultIterator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
+ */
+public class SortMergedPartitionResultIterator extends UnmodifiableIterator<GTRecord> {
+
+ final GTRecord record ; // reuse to avoid object creation
+ PriorityQueue<PeekingIterator<GTRecord>> heap;
+
+ SortMergedPartitionResultIterator(
+ List<PartitionResultIterator> partitionResults,
+ GTInfo info, final Comparator<GTRecord> comparator) {
+
+ this.record = new GTRecord(info);
+ Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
+ public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
+ return comparator.compare(o1.peek(), o2.peek());
+ }
+ };
+ this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
+
+ for (PartitionResultIterator it : partitionResults) {
+ if (it.hasNext()) {
+ heap.offer(Iterators.peekingIterator(it));
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !heap.isEmpty();
+ }
+
+ @Override
+ public GTRecord next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // get smallest record
+ PeekingIterator<GTRecord> it = heap.poll();
+ // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
+ // so we must make a shallow copy of it.
+ record.shallowCopyFrom(it.next());
+
+ if (it.hasNext()) {
+ heap.offer(it);
+ }
+
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/782a9748/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 0f1e191..f1ab20c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -44,7 +44,7 @@ public class StorageResponseGTScatter implements IGTScanner {
private IPartitionStreamer partitionStreamer;
private final Iterator<byte[]> blocks;
private final ImmutableBitSet columns;
- private final StorageContext context;
+ private final ImmutableBitSet groupByDims;
private final boolean needSorted; // whether scanner should return sorted records
public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) {
@@ -52,7 +52,7 @@ public class StorageResponseGTScatter implements IGTScanner {
this.partitionStreamer = partitionStreamer;
this.blocks = partitionStreamer.asByteArrayIterator();
this.columns = scanRequest.getColumns();
- this.context = context;
+ this.groupByDims = scanRequest.getAggrGroupBy();
this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled();
}
@@ -74,13 +74,16 @@ public class StorageResponseGTScatter implements IGTScanner {
partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns));
}
+ if (partitionResults.size() == 1) {
+ return partitionResults.get(0);
+ }
+
if (!needSorted) {
logger.debug("Using Iterators.concat to merge partition results");
return Iterators.concat(partitionResults.iterator());
}
- logger.debug("Using PartitionResultMerger to merge partition results");
- PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator());
- return merger.iterator();
+ logger.debug("Using SortMergedPartitionResultIterator to merge partition results");
+ return new SortMergedPartitionResultIterator(partitionResults, info, GTRecord.getComparator(groupByDims));
}
}
[2/5] kylin git commit: KYLIN-2525 skip signature check for older
version of cubes
Posted by li...@apache.org.
KYLIN-2525 skip signature check for older version of cubes
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c6b4592f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c6b4592f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c6b4592f
Branch: refs/heads/KYLIN-2501
Commit: c6b4592fca047d1fc4094a0812774ddd2d8917f9
Parents: a776ef6
Author: Li Yang <li...@apache.org>
Authored: Wed Mar 29 11:57:10 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Mar 29 21:02:16 2017 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/cube/CubeManager.java | 8 ++++----
.../main/java/org/apache/kylin/cube/model/CubeDesc.java | 11 +++++++++--
2 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c6b4592f/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 073f516..0a94fb2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -761,7 +761,7 @@ public class CubeManager implements IRealizationProvider {
CubeDesc cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(cube.getDescName());
checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not found", cube.getDescName(), cubeName);
- if (!isITTestCube(cubeName))
+ if (!isSpecialTestCube(cubeName))
checkState(cubeDesc.getName().equals(cubeName), "cube name '%s' must be same as descriptor name '%s', but it is not", cubeName, cubeDesc.getName());
if (!cubeDesc.getError().isEmpty()) {
@@ -791,9 +791,9 @@ public class CubeManager implements IRealizationProvider {
}
}
- private boolean isITTestCube(String cubeName) {
- return config.isDevEnv() //
- && (cubeName.startsWith("test_kylin_cube") || cubeName.startsWith("test_streaming"));
+ private boolean isSpecialTestCube(String cubeName) {
+ return cubeName.equals("kylin_sales_cube") //
+ || config.isDevEnv() && (cubeName.startsWith("test_kylin_cube") || cubeName.startsWith("test_streaming"));
}
private MetadataManager getMetadataManager() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/c6b4592f/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index c1469fe..b391055 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -479,8 +479,15 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
return true;
}
- if (KylinVersion.getCurrentVersion().isCompatibleWith(new KylinVersion(getVersion())) && !KylinVersion.getCurrentVersion().isSignatureCompatibleWith(new KylinVersion(getVersion()))) {
- logger.info("checkSignature on {} is skipped as the its version is {} (not signature compatible but compatible) ", getName(), getVersion());
+ KylinVersion cubeVersion = new KylinVersion(getVersion());
+ KylinVersion kylinVersion = KylinVersion.getCurrentVersion();
+ if (!kylinVersion.isCompatibleWith(cubeVersion)) {
+ logger.info("checkSignature on {} is skipped as the its version {} is different from kylin version {}", getName(), cubeVersion, kylinVersion);
+ return true;
+ }
+
+ if (kylinVersion.isCompatibleWith(cubeVersion) && !kylinVersion.isSignatureCompatibleWith(cubeVersion)) {
+ logger.info("checkSignature on {} is skipped as the its version is {} (not signature compatible but compatible) ", getName(), cubeVersion);
return true;
}
[3/5] kylin git commit: KYLIN-2525 tolerate job metadata exception in
StorageCleanupJob
Posted by li...@apache.org.
KYLIN-2525 tolerate job metadata exception in StorageCleanupJob
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fa3ee3ff
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fa3ee3ff
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fa3ee3ff
Branch: refs/heads/KYLIN-2501
Commit: fa3ee3ffbbb11ac3d4dc79dbb719f56a7e913857
Parents: c6b4592
Author: Li Yang <li...@apache.org>
Authored: Wed Mar 29 18:03:59 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Mar 29 21:02:23 2017 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/tool/StorageCleanupJob.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/fa3ee3ff/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 477c58a..f1a3ebe 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -46,6 +46,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -57,7 +58,6 @@ import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.source.hive.HiveClientFactory;
-import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.source.hive.IHiveClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -276,9 +276,14 @@ public class StorageCleanupJob extends AbstractApplication {
sb.append(jobId).append("(").append(state).append("), ");
}
- String segmentId = getSegmentIdFromJobId(jobId);
- if (segmentId != null) {//some jobs are not cubing jobs
- segmentId2JobId.put(segmentId, jobId);
+ try {
+ String segmentId = getSegmentIdFromJobId(jobId);
+ if (segmentId != null) {//some jobs are not cubing jobs
+ segmentId2JobId.put(segmentId, jobId);
+ }
+ } catch (Exception ex) {
+ logger.warn("Failed to find segment ID from job ID " + jobId + ", ignore it");
+ // some older version job metadata may fail to read, ignore it
}
}
logger.info("Working jobIDs: " + workingJobList);