You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 02:54:26 UTC
[12/31] incubator-kylin git commit: KYLIN-1068 support topn in v2
CubeStorageQuery
KYLIN-1068 support topn in v2 CubeStorageQuery
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b0e30966
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b0e30966
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b0e30966
Branch: refs/heads/KYLIN-1112
Commit: b0e3096662487f1114a8dda49c70ea70d75ec649
Parents: 279226f
Author: shaofengshi <sh...@apache.org>
Authored: Wed Oct 28 21:39:13 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 09:53:16 2015 +0800
----------------------------------------------------------------------
query/src/test/resources/query/sql/query82.sql | 27 ++++++++
.../resources/query/sql/query82.sql.disable | 27 --------
.../storage/hbase/cube/v2/CubeStorageQuery.java | 62 ++++++++++++++++++
.../hbase/cube/v2/CubeTupleConverter.java | 61 +++++++++++++++++-
.../v2/SequentialCubeTopNTupleIterator.java | 67 ++++++++++++++++++++
.../cube/v2/SequentialCubeTupleIterator.java | 28 ++++----
6 files changed, 230 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/query/src/test/resources/query/sql/query82.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query82.sql b/query/src/test/resources/query/sql/query82.sql
new file mode 100644
index 0000000..57e9de0
--- /dev/null
+++ b/query/src/test/resources/query/sql/query82.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select test_kylin_fact.cal_dt, seller_id
+ FROM test_kylin_fact
+ inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
+ AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+ group by
+ test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by sum(test_kylin_fact.price) desc limit 20
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/query/src/test/resources/query/sql/query82.sql.disable
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query82.sql.disable b/query/src/test/resources/query/sql/query82.sql.disable
deleted file mode 100644
index 57e9de0..0000000
--- a/query/src/test/resources/query/sql/query82.sql.disable
+++ /dev/null
@@ -1,27 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements. See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership. The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-
-select test_kylin_fact.cal_dt, seller_id
- FROM test_kylin_fact
- inner JOIN edw.test_cal_dt as test_cal_dt
- ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
- inner JOIN test_category_groupings
- ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
- AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
- group by
- test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by sum(test_kylin_fact.price) desc limit 20
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 1d9ee3a..8317403 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -46,15 +46,30 @@ public class CubeStorageQuery implements ICachableStorageQuery {
private final CubeInstance cubeInstance;
private final CubeDesc cubeDesc;
+ private Collection<TblColRef> topNColumns;
public CubeStorageQuery(CubeInstance cube) {
this.cubeInstance = cube;
this.cubeDesc = cube.getDescriptor();
+ this.topNColumns = Lists.newArrayList();
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isTopN()) {
+ List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
+ topNColumns.add(colRefs.get(colRefs.size() - 1));
+ }
+ }
}
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ // check whether this is a TopN query;
+ checkAndRewriteTopN(context, sqlDigest, returnTupleInfo);
+
Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+ TblColRef topNCol = extractTopNCol(groups);
+ if (topNCol != null)
+ groups.remove(topNCol);
+
TupleFilter filter = sqlDigest.filter;
// build dimension & metrics
@@ -110,6 +125,9 @@ public class CubeStorageQuery implements ICachableStorageQuery {
if (scanners.isEmpty())
return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+ if (topNCol != null)
+ return new SequentialCubeTopNTupleIterator(scanners, cuboid, dimensionsD, topNCol, metrics, returnTupleInfo, context);
+
return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
}
@@ -375,4 +393,48 @@ public class CubeStorageQuery implements ICachableStorageQuery {
return false;
}
+
+ private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+ TblColRef topNDisplayCol = extractTopNCol(groups);
+ boolean hasTopN = topNDisplayCol != null;
+
+ if (hasTopN == false)
+ return;
+
+ if (sqlDigest.aggregations.size() != 1) {
+ throw new IllegalStateException("When query with topN, only one metrics is allowed.");
+ }
+
+ FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next();
+ if (functionDesc.isSum() == false) {
+ throw new IllegalStateException("When query with topN, only SUM function is allowed.");
+ }
+
+ FunctionDesc rewriteFunction = null;
+ // replace the SUM to the TopN function
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) {
+ rewriteFunction = measureDesc.getFunction();
+ break;
+ }
+ }
+
+ if (rewriteFunction == null) {
+ throw new IllegalStateException("Didn't find topN measure for " + functionDesc);
+ }
+
+ sqlDigest.aggregations = Lists.newArrayList(rewriteFunction);
+ logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction);
+ }
+
+ private TblColRef extractTopNCol(Collection<TblColRef> colRefs) {
+ for (TblColRef colRef : colRefs) {
+ if (topNColumns.contains(colRef)) {
+ return colRef;
+ }
+ }
+
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
index fb54cce..2354a88 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
@@ -18,17 +18,24 @@
package org.apache.kylin.storage.hbase.cube.v2;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.lookup.LookupStringTable;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -52,13 +59,18 @@ public class CubeTupleConverter {
final int[] tupleIdx;
final Object[] tmpValues;
final int nSelectedDims;
+ final TblColRef topNCol;
+ int topNColTupleIdx;
+ int topNMeasureTupleIdx;
+ Dictionary<String> topNColDict;
public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
- Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+ Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, TblColRef topNCol) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
this.tupleInfo = returnTupleInfo;
this.derivedColFillers = Lists.newArrayList();
+ this.topNCol = topNCol;
List<TblColRef> cuboidDims = cuboid.getColumns();
CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
@@ -93,6 +105,13 @@ public class CubeTupleConverter {
iii++;
}
+ if (this.topNCol != null) {
+ this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
+ this.topNMeasureTupleIdx = nSelectedDims;
+
+ this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
+ }
+
// prepare derived columns and filler
Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(cuboidDims, null);
for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -132,6 +151,12 @@ public class CubeTupleConverter {
}
}
+ public Iterator<Tuple> translateTopNResult(GTRecord record, Tuple tuple) {
+ translateResult(record, tuple);
+ Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
+ assert (topNCounterObj instanceof TopNCounter);
+ return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
+ }
private interface IDerivedColumnFiller {
public void fillDerivedColumns(Object[] tmpValues, Tuple tuple);
}
@@ -221,4 +246,38 @@ public class CubeTupleConverter {
private static String toString(Object o) {
return o == null ? null : o.toString();
}
+
+ private class TopNCounterTupleIterator implements Iterator {
+
+ private Tuple tuple;
+ private Iterator<Counter> topNCounterIterator;
+ private Counter<ByteArray> counter;
+
+ private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
+ this.tuple = tuple;
+ this.topNCounterIterator = topNCounter.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return topNCounterIterator.hasNext();
+ }
+
+ @Override
+ public Tuple next() {
+ counter = topNCounterIterator.next();
+ int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
+ String colValue = topNColDict.getValueFromId(key);
+ tuple.setDimensionValue(topNColTupleIdx, colValue);
+ tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
+
+ return tuple;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
new file mode 100644
index 0000000..aa50d00
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
@@ -0,0 +1,67 @@
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.tuple.Tuple;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Created by shaoshi on 10/28/15.
+ */
+public class SequentialCubeTopNTupleIterator extends SequentialCubeTupleIterator {
+
+ private Iterator<Tuple> innerResultIterator;
+ private TblColRef topNCol;
+
+ public SequentialCubeTopNTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+ TblColRef topNCol, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
+
+ super(scanners, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+ if (innerResultIterator == null) {
+ if (curScanner == null) {
+ if (scannerIterator.hasNext()) {
+ curScanner = scannerIterator.next();
+ curRecordIterator = curScanner.iterator();
+ curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, topNCol);
+ } else {
+ return false;
+ }
+ }
+
+ if (curRecordIterator.hasNext()) {
+ innerResultIterator = curTupleConverter.translateTopNResult(curRecordIterator.next(), tuple);
+ return hasNext();
+ } else {
+ close(curScanner);
+ curScanner = null;
+ curRecordIterator = null;
+ curTupleConverter = null;
+ innerResultIterator = null;
+ return hasNext();
+ }
+
+ }
+
+
+ if (innerResultIterator.hasNext()) {
+ next = innerResultIterator.next();
+ return true;
+ } else {
+ innerResultIterator = null;
+ return hasNext();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
index 85aa54a..ddaafd5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
@@ -22,18 +22,18 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
- private final Cuboid cuboid;
- private final Set<TblColRef> selectedDimensions;
- private final Set<FunctionDesc> selectedMetrics;
- private final TupleInfo tupleInfo;
- private final Tuple tuple;
- private final Iterator<CubeSegmentScanner> scannerIterator;
- private final StorageContext context;
-
- private CubeSegmentScanner curScanner;
- private Iterator<GTRecord> curRecordIterator;
- private CubeTupleConverter curTupleConverter;
- private Tuple next;
+ protected final Cuboid cuboid;
+ protected final Set<TblColRef> selectedDimensions;
+ protected final Set<FunctionDesc> selectedMetrics;
+ protected final TupleInfo tupleInfo;
+ protected final Tuple tuple;
+ protected final Iterator<CubeSegmentScanner> scannerIterator;
+ protected final StorageContext context;
+
+ protected CubeSegmentScanner curScanner;
+ protected Iterator<GTRecord> curRecordIterator;
+ protected CubeTupleConverter curTupleConverter;
+ protected Tuple next;
private int scanCount;
private int scanCountDelta;
@@ -58,7 +58,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
if (scannerIterator.hasNext()) {
curScanner = scannerIterator.next();
curRecordIterator = curScanner.iterator();
- curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+ curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, null);
} else {
return false;
}
@@ -112,7 +112,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
}
}
- private void close(CubeSegmentScanner scanner) {
+ protected void close(CubeSegmentScanner scanner) {
try {
scanner.close();
} catch (IOException e) {