You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 02:54:26 UTC

[12/31] incubator-kylin git commit: KYLIN-1068 support topn in v2 CubeStorageQuery

KYLIN-1068 support topn in v2 CubeStorageQuery


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b0e30966
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b0e30966
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b0e30966

Branch: refs/heads/KYLIN-1112
Commit: b0e3096662487f1114a8dda49c70ea70d75ec649
Parents: 279226f
Author: shaofengshi <sh...@apache.org>
Authored: Wed Oct 28 21:39:13 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 09:53:16 2015 +0800

----------------------------------------------------------------------
 query/src/test/resources/query/sql/query82.sql  | 27 ++++++++
 .../resources/query/sql/query82.sql.disable     | 27 --------
 .../storage/hbase/cube/v2/CubeStorageQuery.java | 62 ++++++++++++++++++
 .../hbase/cube/v2/CubeTupleConverter.java       | 61 +++++++++++++++++-
 .../v2/SequentialCubeTopNTupleIterator.java     | 67 ++++++++++++++++++++
 .../cube/v2/SequentialCubeTupleIterator.java    | 28 ++++----
 6 files changed, 230 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/query/src/test/resources/query/sql/query82.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query82.sql b/query/src/test/resources/query/sql/query82.sql
new file mode 100644
index 0000000..57e9de0
--- /dev/null
+++ b/query/src/test/resources/query/sql/query82.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select  test_kylin_fact.cal_dt, seller_id
+  FROM test_kylin_fact
+ inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
+ AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id 
+ group by 
+ test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by sum(test_kylin_fact.price) desc limit 20

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/query/src/test/resources/query/sql/query82.sql.disable
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query82.sql.disable b/query/src/test/resources/query/sql/query82.sql.disable
deleted file mode 100644
index 57e9de0..0000000
--- a/query/src/test/resources/query/sql/query82.sql.disable
+++ /dev/null
@@ -1,27 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-
-select  test_kylin_fact.cal_dt, seller_id
-  FROM test_kylin_fact
- inner JOIN edw.test_cal_dt as test_cal_dt
- ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
- inner JOIN test_category_groupings
- ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
- AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id 
- group by 
- test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by sum(test_kylin_fact.price) desc limit 20

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 1d9ee3a..8317403 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -46,15 +46,30 @@ public class CubeStorageQuery implements ICachableStorageQuery {
 
     private final CubeInstance cubeInstance;
     private final CubeDesc cubeDesc;
+    private Collection<TblColRef> topNColumns;
 
     public CubeStorageQuery(CubeInstance cube) {
         this.cubeInstance = cube;
         this.cubeDesc = cube.getDescriptor();
+        this.topNColumns = Lists.newArrayList();
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isTopN()) {
+                List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
+                topNColumns.add(colRefs.get(colRefs.size() - 1));
+            }
+        }
     }
 
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        // check whether this is a TopN query;
+        checkAndRewriteTopN(context, sqlDigest, returnTupleInfo);
+
         Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TblColRef topNCol = extractTopNCol(groups);
+        if (topNCol != null)
+            groups.remove(topNCol);
+
         TupleFilter filter = sqlDigest.filter;
 
         // build dimension & metrics
@@ -110,6 +125,9 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         if (scanners.isEmpty())
             return ITupleIterator.EMPTY_TUPLE_ITERATOR;
 
+        if (topNCol != null)
+            return new SequentialCubeTopNTupleIterator(scanners, cuboid, dimensionsD, topNCol, metrics, returnTupleInfo, context);
+        
         return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
     }
 
@@ -375,4 +393,48 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         return false;
     }
 
+
+    private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TblColRef topNDisplayCol = extractTopNCol(groups);
+        boolean hasTopN = topNDisplayCol != null;
+
+        if (hasTopN == false)
+            return;
+
+        if (sqlDigest.aggregations.size() != 1) {
+            throw new IllegalStateException("When query with topN, only one metrics is allowed.");
+        }
+
+        FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next();
+        if (functionDesc.isSum() == false) {
+            throw new IllegalStateException("When query with topN, only SUM function is allowed.");
+        }
+
+        FunctionDesc rewriteFunction = null;
+        // replace the SUM to the TopN function
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) {
+                rewriteFunction = measureDesc.getFunction();
+                break;
+            }
+        }
+
+        if (rewriteFunction == null) {
+            throw new IllegalStateException("Didn't find topN measure for " + functionDesc);
+        }
+
+        sqlDigest.aggregations = Lists.newArrayList(rewriteFunction);
+        logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction);
+    }
+
+    private TblColRef extractTopNCol(Collection<TblColRef> colRefs) {
+        for (TblColRef colRef : colRefs) {
+            if (topNColumns.contains(colRef)) {
+                return colRef;
+            }
+        }
+
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
index fb54cce..2354a88 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
@@ -18,17 +18,24 @@
 
 package org.apache.kylin.storage.hbase.cube.v2;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.lookup.LookupStringTable;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -52,13 +59,18 @@ public class CubeTupleConverter {
     final int[] tupleIdx;
     final Object[] tmpValues;
     final int nSelectedDims;
+    final TblColRef topNCol;
+    int topNColTupleIdx;
+    int topNMeasureTupleIdx;
+    Dictionary<String> topNColDict;
 
     public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
-            Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+            Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, TblColRef topNCol) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
         this.tupleInfo = returnTupleInfo;
         this.derivedColFillers = Lists.newArrayList();
+        this.topNCol = topNCol;
 
         List<TblColRef> cuboidDims = cuboid.getColumns();
         CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
@@ -93,6 +105,13 @@ public class CubeTupleConverter {
             iii++;
         }
 
+        if (this.topNCol != null) {
+            this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
+            this.topNMeasureTupleIdx = nSelectedDims;
+
+            this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
+        }
+
         // prepare derived columns and filler
         Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(cuboidDims, null);
         for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -132,6 +151,12 @@ public class CubeTupleConverter {
         }
     }
 
+    public Iterator<Tuple> translateTopNResult(GTRecord record, Tuple tuple) {
+        translateResult(record, tuple);
+        Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
+        assert (topNCounterObj instanceof TopNCounter);
+        return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
+    }
     private interface IDerivedColumnFiller {
         public void fillDerivedColumns(Object[] tmpValues, Tuple tuple);
     }
@@ -221,4 +246,38 @@ public class CubeTupleConverter {
     private static String toString(Object o) {
         return o == null ? null : o.toString();
     }
+
+    private class TopNCounterTupleIterator implements Iterator {
+
+        private Tuple tuple;
+        private Iterator<Counter> topNCounterIterator;
+        private Counter<ByteArray> counter;
+
+        private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
+            this.tuple = tuple;
+            this.topNCounterIterator = topNCounter.iterator();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return topNCounterIterator.hasNext();
+        }
+
+        @Override
+        public Tuple next() {
+            counter = topNCounterIterator.next();
+            int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
+            String colValue = topNColDict.getValueFromId(key);
+            tuple.setDimensionValue(topNColTupleIdx, colValue);
+            tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
+
+            return tuple;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
new file mode 100644
index 0000000..aa50d00
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
@@ -0,0 +1,67 @@
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.tuple.Tuple;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Created by shaoshi on 10/28/15.
+ */
+public class SequentialCubeTopNTupleIterator extends SequentialCubeTupleIterator {
+
+    private Iterator<Tuple> innerResultIterator;
+    private TblColRef topNCol;
+
+    public SequentialCubeTopNTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+            TblColRef topNCol, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
+
+        super(scanners, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context);
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (next != null)
+            return true;
+        if (innerResultIterator == null) {
+            if (curScanner == null) {
+                if (scannerIterator.hasNext()) {
+                    curScanner = scannerIterator.next();
+                    curRecordIterator = curScanner.iterator();
+                    curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, topNCol);
+                } else {
+                    return false;
+                }
+            }
+            
+            if (curRecordIterator.hasNext()) {
+                innerResultIterator = curTupleConverter.translateTopNResult(curRecordIterator.next(), tuple);
+                return hasNext();
+            } else {
+                close(curScanner);
+                curScanner = null;
+                curRecordIterator = null;
+                curTupleConverter = null;
+                innerResultIterator = null;
+                return hasNext();
+            }
+
+        }
+
+       
+        if (innerResultIterator.hasNext()) {
+            next = innerResultIterator.next();
+            return true;
+        } else {
+            innerResultIterator = null;
+            return hasNext();
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b0e30966/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
index 85aa54a..ddaafd5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
@@ -22,18 +22,18 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
 
     private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
 
-    private final Cuboid cuboid;
-    private final Set<TblColRef> selectedDimensions;
-    private final Set<FunctionDesc> selectedMetrics;
-    private final TupleInfo tupleInfo;
-    private final Tuple tuple;
-    private final Iterator<CubeSegmentScanner> scannerIterator;
-    private final StorageContext context;
-
-    private CubeSegmentScanner curScanner;
-    private Iterator<GTRecord> curRecordIterator;
-    private CubeTupleConverter curTupleConverter;
-    private Tuple next;
+    protected final Cuboid cuboid;
+    protected final Set<TblColRef> selectedDimensions;
+    protected final Set<FunctionDesc> selectedMetrics;
+    protected final TupleInfo tupleInfo;
+    protected final Tuple tuple;
+    protected final Iterator<CubeSegmentScanner> scannerIterator;
+    protected final StorageContext context;
+
+    protected CubeSegmentScanner curScanner;
+    protected Iterator<GTRecord> curRecordIterator;
+    protected CubeTupleConverter curTupleConverter;
+    protected Tuple next;
 
     private int scanCount;
     private int scanCountDelta;
@@ -58,7 +58,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
             if (scannerIterator.hasNext()) {
                 curScanner = scannerIterator.next();
                 curRecordIterator = curScanner.iterator();
-                curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+                curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, null);
             } else {
                 return false;
             }
@@ -112,7 +112,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
         }
     }
 
-    private void close(CubeSegmentScanner scanner) {
+    protected void close(CubeSegmentScanner scanner) {
         try {
             scanner.close();
         } catch (IOException e) {