You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/09/21 04:16:11 UTC
[05/13] incubator-kylin git commit: KYLIN-943 update query/storage
engine to support TopN
KYLIN-943 update query/storage engine to support TopN
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/988c2e76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/988c2e76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/988c2e76
Branch: refs/heads/KYLIN-943
Commit: 988c2e76f4d57b3e14c4b548d7c0959aeffa4924
Parents: 97de708
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 17 14:05:06 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Sep 21 10:11:54 2015 +0800
----------------------------------------------------------------------
.../kylin/job/BuildCubeWithEngineTest.java | 6 +-
.../apache/kylin/common/topn/TopNCounter.java | 55 ++++++++++--
.../kylin/cube/CubeCapabilityChecker.java | 60 +++++++++++--
.../apache/kylin/cube/CubeDimensionDeriver.java | 10 ++-
.../org/apache/kylin/cube/model/CubeDesc.java | 6 +-
.../kylin/metadata/model/FunctionDesc.java | 14 ++-
.../kylin/metadata/realization/SQLDigest.java | 13 ++-
.../apache/kylin/storage/StorageContext.java | 15 +---
.../kylin/storage/hybrid/HybridInstance.java | 4 +-
.../org/apache/kylin/storage/tuple/Tuple.java | 3 +
.../kylin/storage/cache/DynamicCacheTest.java | 4 +-
.../kylin/storage/cache/StaticCacheTest.java | 4 +-
.../cube_desc/test_kylin_cube_topn_desc.json | 2 +-
.../localmeta/data/DEFAULT.STREAMING_TABLE.csv | 0
.../test_kylin_inner_join_model_desc.json | 3 +-
.../test_kylin_left_join_model_desc.json | 3 +-
.../apache/kylin/query/relnode/OLAPContext.java | 18 +++-
.../kylin/query/relnode/OLAPLimitRel.java | 1 +
.../apache/kylin/query/relnode/OLAPSortRel.java | 11 +--
.../kylin/query/test/ITKylinQueryTest.java | 2 +-
query/src/test/resources/query/sql/query81.sql | 26 ++++++
query/src/test/resources/query/sql/query82.sql | 26 ++++++
.../cube/v1/CubeSegmentTopNTupleIterator.java | 86 ++++++++++++++++++
.../hbase/cube/v1/CubeSegmentTupleIterator.java | 35 ++++----
.../storage/hbase/cube/v1/CubeStorageQuery.java | 92 +++++++++++++++-----
.../hbase/cube/v1/CubeTupleConverter.java | 71 ++++++++++++++-
.../cube/v1/SerializedHBaseTupleIterator.java | 9 +-
.../storage/hbase/common/ITStorageTest.java | 4 +-
28 files changed, 496 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 7c6b028..4564ccc 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -120,13 +120,13 @@ public class BuildCubeWithEngineTest {
@Test
public void test() throws Exception {
DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
- testInner();
+// testInner();
testLeft();
}
private void testInner() throws Exception {
- String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
-// String[] testCase = new String[] { "testInnerJoinTopNCube" };
+ String[] testCase = new String[] { "testInnerJoinTopNCube" };
+ // String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
runTestAndAssertSucceed(testCase);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 69e8d56..6814b8d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -23,10 +23,7 @@ import org.apache.kylin.common.util.Pair;
import java.io.*;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
@@ -38,10 +35,11 @@ import java.util.Map;
*
* @param <T> type of data in the stream to be summarized
*/
-public class TopNCounter<T> implements ITopK<T> {
+public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
public static final int EXTRA_SPACE_RATE = 50;
+
protected class Bucket {
protected DoublyLinkedList<Counter<T>> counterList;
@@ -365,4 +363,51 @@ public class TopNCounter<T> implements ITopK<T> {
return items;
}
+
+ @Override
+ public Iterator<Counter<T>> iterator() {
+ return new TopNCounterIterator();
+ }
+
+ private class TopNCounterIterator implements Iterator {
+
+ private ListNode2<Bucket> currentBNode;
+ private Iterator<Counter<T>> currentCounterIterator;
+
+ private TopNCounterIterator() {
+ currentBNode = bucketList.head();
+ if (currentBNode != null && currentBNode.getValue() != null) {
+ currentCounterIterator = currentBNode.getValue().counterList.iterator();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentCounterIterator == null) {
+ return false;
+ }
+
+ if (currentCounterIterator.hasNext()) {
+ return true;
+ }
+
+ currentBNode = currentBNode.getPrev();
+
+ if (currentBNode == null)
+ return false;
+
+ currentCounterIterator = currentBNode.getValue().counterList.iterator();
+ return hasNext();
+ }
+
+ @Override
+ public Counter<T> next() {
+ return currentCounterIterator.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 77c3298..628340e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -18,19 +18,20 @@
package org.apache.kylin.cube;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
/**
*/
public class CubeCapabilityChecker {
@@ -39,10 +40,11 @@ public class CubeCapabilityChecker {
public static boolean check(CubeInstance cube, SQLDigest digest, boolean allowWeakMatch) {
// retrieve members from olapContext
- Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest.groupbyColumns, digest.filterColumns);
+ Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest);
Collection<FunctionDesc> functions = digest.aggregations;
Collection<TblColRef> metricsColumns = digest.metricColumns;
Collection<JoinDesc> joins = digest.joinDescs;
+ boolean hasTopN = hasTopNMeasure(cube.getDescriptor());
// match dimensions & aggregations & joins
@@ -62,6 +64,13 @@ public class CubeCapabilityChecker {
}
}
+ // for topn, the group column can come from measure
+ if (hasTopN & matchJoin && !matchDimensions && functions.size() == 1) {
+ boolean matchedTopN = isMatchedWithTopN(dimensionColumns, cube, digest);
+ matchDimensions = matchedTopN;
+ matchAggregation = matchedTopN;
+ }
+
if (!isOnline || !matchDimensions || !matchAggregation || !matchJoin) {
logger.info("Exclude cube " + cube.getName() + " because " + " isOnlne=" + isOnline + ",matchDimensions=" + matchDimensions + ",matchAggregation=" + matchAggregation + ",matchJoin=" + matchJoin);
return false;
@@ -70,6 +79,47 @@ public class CubeCapabilityChecker {
return true;
}
+ private static boolean isMatchedWithTopN(Collection<TblColRef> dimensionColumns, CubeInstance cube, SQLDigest digest) {
+
+ CubeDesc cubeDesc = cube.getDescriptor();
+ List<FunctionDesc> cubeFunctions = cubeDesc.listAllFunctions();
+ Collection<FunctionDesc> functions = digest.aggregations;
+ Collection<MeasureDesc> sortMeasures = digest.sortMeasures;
+ Collection<SQLDigest.OrderEnum> sortOrders = digest.sortOrders;
+
+ FunctionDesc onlyFunction = functions.iterator().next();
+ if (onlyFunction.isSum() == false) {
+ // topN only support SUM expression
+ return false;
+ }
+
+ Collection<TblColRef> dimensionColumnsCopy = new ArrayList<TblColRef>(dimensionColumns);
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ if (measure.getFunction().isTopN()) {
+ List<TblColRef> cols = measure.getFunction().getParameter().getColRefs();
+ TblColRef displayCol = cols.get(cols.size() - 1);
+ dimensionColumnsCopy.remove(displayCol);
+ if(isMatchedWithDimensions(dimensionColumnsCopy, cube)) {
+ if (measure.getFunction().isCompatible(onlyFunction)) {
+ return true;
+ }
+ }
+ dimensionColumnsCopy.add(displayCol);
+ }
+ }
+
+ return false;
+ }
+
+ private static boolean hasTopNMeasure(CubeDesc cubeDesc) {
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isTopN())
+ return true;
+ }
+
+ return false;
+ }
+
private static boolean isMatchedWithDimensions(Collection<TblColRef> dimensionColumns, CubeInstance cube) {
CubeDesc cubeDesc = cube.getDescriptor();
boolean matchAgg = cubeDesc.listDimensionColumnsIncludingDerived().containsAll(dimensionColumns);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
index a746c99..138d01e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
@@ -21,7 +21,9 @@ package org.apache.kylin.cube;
import java.util.Collection;
import java.util.HashSet;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
/**
*
@@ -29,7 +31,13 @@ import org.apache.kylin.metadata.model.TblColRef;
*/
public class CubeDimensionDeriver {
- public static Collection<TblColRef> getDimensionColumns(Collection<TblColRef> groupByColumns, Collection<TblColRef> filterColumns) {
+ public static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) {
+ Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
+ Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
+
+ Collection<MeasureDesc> sortMeasures = sqlDigest.sortMeasures;
+ Collection<SQLDigest.OrderEnum> sortOrders = sqlDigest.sortOrders;
+
Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
dimensionColumns.addAll(groupByColumns);
dimensionColumns.addAll(filterColumns);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 90b5474..9cbdfae 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -129,6 +129,7 @@ public class CubeDesc extends RootPersistentEntity {
private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<TblColRef>();
+
private LinkedHashSet<TblColRef> measureDisplayColumns = new LinkedHashSet<TblColRef>();
private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap();
@@ -827,5 +828,8 @@ public class CubeDesc extends RootPersistentEntity {
}
return result;
}
-
+
+ public LinkedHashSet<TblColRef> getMeasureDisplayColumns() {
+ return measureDisplayColumns;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index b87d50c..d10f395 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -60,7 +60,7 @@ public class FunctionDesc {
}
public boolean needRewrite() {
- return !isSum() && !isDimensionAsMetric();
+ return !isSum() && !isDimensionAsMetric() && !isTopN();
}
public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
@@ -225,4 +225,16 @@ public class FunctionDesc {
return "FunctionDesc [expression=" + expression + ", parameter=" + parameter + ", returnType=" + returnType + "]";
}
+ public boolean isCompatible(FunctionDesc another) {
+ if (another == null) {
+ return false;
+ }
+
+ if (this.isTopN() && another.isSum()) {
+ if (this.getParameter().getColRefs().get(0).equals(another.getParameter().getColRefs().get(0)))
+ return true;
+ }
+
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
index 7811858..e48cebe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
@@ -19,15 +19,22 @@
package org.apache.kylin.metadata.realization;
import java.util.Collection;
+import java.util.List;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
/**
*/
public class SQLDigest {
+
+ public enum OrderEnum {
+ ASCENDING, DESCENDING
+ }
+
public String factTable;
public TupleFilter filter;
public Collection<JoinDesc> joinDescs;
@@ -36,9 +43,11 @@ public class SQLDigest {
public Collection<TblColRef> filterColumns;
public Collection<TblColRef> metricColumns;
public Collection<FunctionDesc> aggregations;
+ public Collection<MeasureDesc> sortMeasures;
+ public Collection<OrderEnum> sortOrders;
public SQLDigest(String factTable, TupleFilter filter, Collection<JoinDesc> joinDescs, Collection<TblColRef> allColumns, //
- Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc) {
+ Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc, Collection<MeasureDesc> sortMeasures, Collection<OrderEnum> sortOrders) {
this.factTable = factTable;
this.filter = filter;
this.joinDescs = joinDescs;
@@ -47,6 +56,8 @@ public class SQLDigest {
this.filterColumns = filterColumns;
this.metricColumns = aggregatedColumns;
this.aggregations = aggregateFunnc;
+ this.sortMeasures = sortMeasures;
+ this.sortOrders = sortOrders;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 8b1b706..1643aa4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.realization.SQLDigest;
/**
* @author xjiang
@@ -32,17 +33,12 @@ public class StorageContext {
public static final int DEFAULT_THRESHOLD = 1000000;
- public enum OrderEnum {
- ASCENDING, DESCENDING
- }
private String connUrl;
private int threshold;
private int limit;
private int offset;
private boolean hasSort;
- private List<MeasureDesc> sortMeasures;
- private List<OrderEnum> sortOrders;
private boolean acceptPartialResult;
private boolean exactAggregation;
@@ -59,8 +55,6 @@ public class StorageContext {
this.totalScanCount = new AtomicLong();
this.cuboid = null;
this.hasSort = false;
- this.sortOrders = new ArrayList<OrderEnum>();
- this.sortMeasures = new ArrayList<MeasureDesc>();
this.exactAggregation = false;
this.enableLimit = false;
@@ -110,13 +104,6 @@ public class StorageContext {
return this.enableLimit;
}
- public void addSort(MeasureDesc measure, OrderEnum order) {
- if (measure != null) {
- sortMeasures.add(measure);
- sortOrders.add(order);
- }
- }
-
public void markSort() {
this.hasSort = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 6ad27d5..0c30a3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -180,7 +180,9 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
@Override
public DataModelDesc getDataModelDesc() {
- return this.getLatestRealization().getDataModelDesc();
+ if (this.getLatestRealization() != null)
+ return this.getLatestRealization().getDataModelDesc();
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 0ffae69..11b03bd 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -25,6 +25,7 @@ import java.util.List;
import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.metadata.measure.DoubleMutable;
import org.apache.kylin.metadata.measure.LongMutable;
@@ -65,6 +66,8 @@ public class Tuple implements ITuple {
ret.values[i] = null;
} else if (this.values[i] instanceof HyperLogLogPlusCounter) {
ret.values[i] = new HyperLogLogPlusCounter((HyperLogLogPlusCounter) this.values[i]);
+ } else if (this.values[i] instanceof TopNCounter) {
+ ret.values[i] = null;
} else {
ret.values[i] = this.values[i];
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
index 309d67f..161cad6 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
@@ -1,5 +1,6 @@
package org.apache.kylin.storage.cache;
+import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -8,6 +9,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.IdentityUtils;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITuple;
@@ -76,7 +78,7 @@ public class DynamicCacheTest {
final List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
final TupleInfo tupleInfo = StorageMockUtils.newTupleInfo(groups, aggregations);
- SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", null, null, Lists.<TblColRef> newArrayList(), groups, Lists.newArrayList(partitionCol), Lists.<TblColRef> newArrayList(), aggregations);
+ SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", null, null, Lists.<TblColRef> newArrayList(), groups, Lists.newArrayList(partitionCol), Lists.<TblColRef> newArrayList(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
ITuple aTuple = new TsOnlyTuple(partitionCol, "2011-02-01");
ITuple bTuple = new TsOnlyTuple(partitionCol, "2012-02-01");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
index e54e3e0..48b0b1d 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
@@ -1,5 +1,6 @@
package org.apache.kylin.storage.cache;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
@@ -8,6 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.util.IdentityUtils;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITuple;
@@ -34,7 +36,7 @@ public class StaticCacheTest {
final List<TblColRef> groups = StorageMockUtils.buildGroups();
final List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
final TupleFilter filter = StorageMockUtils.buildFilter1(groups.get(0));
- final SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+ final SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
final TupleInfo tupleInfo = StorageMockUtils.newTupleInfo(groups, aggregations);
final List<ITuple> ret = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
index 2e0e376..96c3ace 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
@@ -111,7 +111,7 @@
]
},
"last_modified": 1422435345330,
- "model_name": "test_kylin_inner_join_model_desc",
+ "model_name": "test_kylin_left_join_model_desc",
"null_string": null,
"hbase_mapping": {
"column_family": [
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
index 86f8169..a28684f 100644
--- a/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
@@ -36,7 +36,8 @@
"columns": [
"lstg_format_name",
"LSTG_SITE_ID",
- "SLR_SEGMENT_CD"
+ "SLR_SEGMENT_CD",
+ "seller_id"
]
},
{
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
index d05a08f..c26ffc5 100644
--- a/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
@@ -47,7 +47,8 @@
"columns": [
"lstg_format_name",
"LSTG_SITE_ID",
- "SLR_SEGMENT_CD"
+ "SLR_SEGMENT_CD",
+ "seller_id"
]
},
{
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 378221c..6865457 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -26,11 +26,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.SQLDigest;
@@ -81,6 +83,8 @@ public class OLAPContext {
public OLAPContext(int seq) {
this.id = seq;
this.storageContext = new StorageContext();
+ this.sortMeasures = Lists.newArrayList();
+ this.sortOrders = Lists.newArrayList();
Map<String, String> parameters = _localPrarameters.get();
if (parameters != null) {
String acceptPartialResult = parameters.get(PRM_ACCEPT_PARTIAL_RESULT);
@@ -111,10 +115,14 @@ public class OLAPContext {
public Collection<TblColRef> filterColumns = new HashSet<TblColRef>();
public TupleFilter filter;
public List<JoinDesc> joins = new LinkedList<JoinDesc>();
+ private List<MeasureDesc> sortMeasures;
+ private List<SQLDigest.OrderEnum> sortOrders;
// rewrite info
public Map<String, RelDataType> rewriteFields = new HashMap<String, RelDataType>();
+ public int limit;
+
// hive query
public String sql = "";
@@ -126,7 +134,7 @@ public class OLAPContext {
public SQLDigest getSQLDigest() {
if (sqlDigest == null)
- sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations);
+ sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, sortMeasures, sortOrders);
return sqlDigest;
}
@@ -144,4 +152,12 @@ public class OLAPContext {
}
this.returnTupleInfo = info;
}
+
+ public void addSort(MeasureDesc measure, SQLDigest.OrderEnum order) {
+ if (measure != null) {
+ sortMeasures.add(measure);
+ sortOrders.add(order);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
index 60acd40..74d5de0 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
@@ -78,6 +78,7 @@ public class OLAPLimitRel extends SingleRel implements OLAPRel {
Number limitValue = (Number) (((RexLiteral) localFetch).getValue());
int limit = limitValue.intValue();
this.context.storageContext.setLimit(limit);
+ this.context.limit = limit;
if(localOffset != null) {
Number offsetValue = (Number) (((RexLiteral) localOffset).getValue());
int offset = offsetValue.intValue();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
index fa5dc1d..b023dfd 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
@@ -35,6 +35,7 @@ import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexNode;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.storage.StorageContext;
import com.google.common.base.Preconditions;
@@ -82,12 +83,12 @@ public class OLAPSortRel extends Sort implements OLAPRel {
for (RelFieldCollation fieldCollation : this.collation.getFieldCollations()) {
int index = fieldCollation.getFieldIndex();
- StorageContext.OrderEnum order = getOrderEnum(fieldCollation.getDirection());
+ SQLDigest.OrderEnum order = getOrderEnum(fieldCollation.getDirection());
OLAPRel olapChild = (OLAPRel) this.getInput();
TblColRef orderCol = olapChild.getColumnRowType().getAllColumns().get(index);
MeasureDesc measure = findMeasure(orderCol);
if (measure != null) {
- this.context.storageContext.addSort(measure, order);
+ this.context.addSort(measure, order);
}
this.context.storageContext.markSort();
}
@@ -96,11 +97,11 @@ public class OLAPSortRel extends Sort implements OLAPRel {
this.columnRowType = buildColumnRowType();
}
- private StorageContext.OrderEnum getOrderEnum(RelFieldCollation.Direction direction) {
+ private SQLDigest.OrderEnum getOrderEnum(RelFieldCollation.Direction direction) {
if (direction == RelFieldCollation.Direction.DESCENDING) {
- return StorageContext.OrderEnum.DESCENDING;
+ return SQLDigest.OrderEnum.DESCENDING;
} else {
- return StorageContext.OrderEnum.ASCENDING;
+ return SQLDigest.OrderEnum.ASCENDING;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
index 72d7c4a..4821ce9 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -95,7 +95,7 @@ public class ITKylinQueryTest extends KylinTestBase {
@Test
public void testSingleRunQuery() throws Exception {
- String queryFileName = "src/test/resources/query/sql/query44.sql";
+ String queryFileName = "src/test/resources/query/sql/query82.sql";
File sqlFile = new File(queryFileName);
if (sqlFile.exists()) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/test/resources/query/sql/query81.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query81.sql b/query/src/test/resources/query/sql/query81.sql
new file mode 100644
index 0000000..78e30c5
--- /dev/null
+++ b/query/src/test/resources/query/sql/query81.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT
+ seller_id
+ ,sum(test_kylin_fact.price) as GMV
+ FROM test_kylin_fact inner join edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ where test_kylin_fact.cal_dt < DATE '2013-02-01'
+ group by
+ test_kylin_fact.seller_id order by gmv desc limit 100
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/test/resources/query/sql/query82.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query82.sql b/query/src/test/resources/query/sql/query82.sql
new file mode 100644
index 0000000..6b62753
--- /dev/null
+++ b/query/src/test/resources/query/sql/query82.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT
+ test_kylin_fact.cal_dt, seller_id
+ ,sum(test_kylin_fact.price) as GMV
+ FROM test_kylin_fact
+left JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ group by
+ test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by gmv desc limit 100
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
new file mode 100644
index 0000000..a8b1d02
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1;
+
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
+import org.apache.kylin.storage.translate.HBaseKeyRange;
+import org.apache.kylin.storage.tuple.Tuple;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ */
+public class CubeSegmentTopNTupleIterator extends CubeSegmentTupleIterator{
+
+ private Iterator<Tuple> innerResultIterator;
+
+ public CubeSegmentTopNTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
+ Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, //
+ List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
+ super(cubeSeg, keyRanges, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+ this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, topNCol);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+
+ if (innerResultIterator == null) {
+ if (resultIterator == null) {
+ if (rangeIterator.hasNext() == false)
+ return false;
+
+ resultIterator = doScan(rangeIterator.next());
+ }
+
+ if (resultIterator.hasNext() == false) {
+ closeScanner();
+ resultIterator = null;
+ innerResultIterator = null;
+ return hasNext();
+ }
+
+ Result result = resultIterator.next();
+ scanCount++;
+ if (++scanCountDelta >= 1000)
+ flushScanCountDelta();
+ innerResultIterator = tupleConverter.translateTopNResult(result, oneTuple);
+ }
+
+ if (innerResultIterator.hasNext()) {
+ next = innerResultIterator.next();
+ return true;
+ } else {
+ innerResultIterator = null;
+ return hasNext();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 0110fbe..9b2cf66 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -64,25 +64,26 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
public static final int SCAN_CACHE = 1024;
- private final CubeSegment cubeSeg;
+ protected final CubeSegment cubeSeg;
private final TupleFilter filter;
private final Collection<TblColRef> groupBy;
- private final Collection<RowValueDecoder> rowValueDecoders;
+ protected final List<RowValueDecoder> rowValueDecoders;
private final StorageContext context;
private final String tableName;
private final HTableInterface table;
- private final CubeTupleConverter tupleConverter;
- private final Iterator<HBaseKeyRange> rangeIterator;
- private final Tuple oneTuple; // avoid new instance
+ protected CubeTupleConverter tupleConverter;
+ protected final Iterator<HBaseKeyRange> rangeIterator;
+ protected final Tuple oneTuple; // avoid new instance
private Scan scan;
private ResultScanner scanner;
- private Iterator<Result> resultIterator;
- private int scanCount;
- private int scanCountDelta;
- private Tuple next;
-
+ protected Iterator<Result> resultIterator;
+ protected int scanCount;
+ protected int scanCountDelta;
+ protected Tuple next;
+ protected final Cuboid cuboid;
+
public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, //
List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
@@ -93,12 +94,12 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
this.context = context;
this.tableName = cubeSeg.getStorageLocationIdentifier();
- Cuboid cuboid = keyRanges.get(0).getCuboid();
+ cuboid = keyRanges.get(0).getCuboid();
for (HBaseKeyRange range : keyRanges) {
assert cuboid.equals(range.getCuboid());
}
- this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo);
+ this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, null);
this.oneTuple = new Tuple(returnTupleInfo);
this.rangeIterator = keyRanges.iterator();
@@ -108,9 +109,10 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
throw new StorageException("Error when open connection to table " + tableName, t);
}
}
-
+
@Override
public boolean hasNext() {
+
if (next != null)
return true;
@@ -136,6 +138,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
return true;
}
+
@Override
public Tuple next() {
if (next == null) {
@@ -153,7 +156,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
throw new UnsupportedOperationException();
}
- private final Iterator<Result> doScan(HBaseKeyRange keyRange) {
+ protected final Iterator<Result> doScan(HBaseKeyRange keyRange) {
Iterator<Result> iter = null;
try {
scan = buildScan(keyRange);
@@ -247,7 +250,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
return result;
}
- private void closeScanner() {
+ protected void closeScanner() {
flushScanCountDelta();
if (logger.isDebugEnabled() && scan != null) {
@@ -286,7 +289,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
closeTable();
}
- private void flushScanCountDelta() {
+ protected void flushScanCountDelta() {
context.increaseTotalScanCount(scanCountDelta);
scanCountDelta = 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index aca3ca9..58c589b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -18,20 +18,10 @@
package org.apache.kylin.storage.hbase.cube.v1;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Pair;
@@ -68,10 +58,7 @@ import org.apache.kylin.storage.tuple.TupleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
+import java.util.*;
//v1
@SuppressWarnings("unused")
@@ -85,17 +72,32 @@ public class CubeStorageQuery implements ICachableStorageQuery {
private final CubeInstance cubeInstance;
private final CubeDesc cubeDesc;
private final String uuid;
+ private Collection<TblColRef> topNColumns;
public CubeStorageQuery(CubeInstance cube) {
this.cubeInstance = cube;
this.cubeDesc = cube.getDescriptor();
this.uuid = cube.getUuid();
+ this.topNColumns = Lists.newArrayList();
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isTopN()) {
+ List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
+ topNColumns.add(colRefs.get(colRefs.size() - 1));
+ }
+ }
}
-
+
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ // check whether this is a TopN query;
+ checkAndRewriteTopN(context, sqlDigest, returnTupleInfo);
+
Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+ TblColRef topNCol = extractTopNCol(groups);
+ if (topNCol != null)
+ groups.remove(topNCol);
+
TupleFilter filter = sqlDigest.filter;
// build dimension & metrics
@@ -148,7 +150,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
setLimit(filter, context);
HConnection conn = HBaseConnection.get(context.getConnUrl());
- return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
+ return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo);
}
@Override
@@ -179,6 +181,12 @@ public class CubeStorageQuery implements ICachableStorageQuery {
if (sqlDigest.metricColumns.contains(column)) {
continue;
}
+
+ // skip topN display col
+ if (topNColumns.contains(column)) {
+ continue;
+ }
+
dimensions.add(column);
}
}
@@ -700,4 +708,48 @@ public class CubeStorageQuery implements ICachableStorageQuery {
ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
}
+ private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+ TblColRef topNDisplayCol = extractTopNCol(groups);
+ boolean hasTopN = topNDisplayCol != null;
+
+ if (hasTopN == false)
+ return;
+
+ if (sqlDigest.aggregations.size() != 1) {
+ throw new IllegalStateException("When query with topN, only one metrics is allowed.");
+ }
+
+ FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next();
+ if (functionDesc.isSum() == false) {
+ throw new IllegalStateException("When query with topN, only SUM function is allowed.");
+ }
+
+ FunctionDesc rewriteFunction = null;
+ // replace the SUM to the TopN function
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) {
+ rewriteFunction = measureDesc.getFunction();
+ break;
+ }
+ }
+
+ if (rewriteFunction == null) {
+ throw new IllegalStateException("Didn't find topN measure for " + functionDesc);
+ }
+
+ sqlDigest.aggregations = Lists.newArrayList(rewriteFunction);
+ logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction);
+ }
+
+ private TblColRef extractTopNCol(Collection<TblColRef> colRefs) {
+ for (TblColRef colRef : colRefs) {
+ if (topNColumns.contains(colRef)) {
+ return colRef;
+ }
+ }
+
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index e569cbd..8813901 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -2,21 +2,31 @@ package org.apache.kylin.storage.hbase.cube.v1;
import java.io.IOException;
import java.util.BitSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowKeyDecoder;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
import org.apache.kylin.storage.tuple.Tuple;
import org.apache.kylin.storage.tuple.TupleInfo;
@@ -30,20 +40,24 @@ public class CubeTupleConverter {
final TupleInfo tupleInfo;
final RowKeyDecoder rowKeyDecoder;
final List<RowValueDecoder> rowValueDecoders;
- final List<IDerivedColumnFiller> derivedColFillers;
-
+ final List<IDerivedColumnFiller> derivedColFillers;
final int[] dimensionTupleIdx;
final int[][] metricsMeasureIdx;
final int[][] metricsTupleIdx;
+ final TblColRef topNCol;
+ int topNColTupleIdx;
+ int topNMeasureTupleIdx;
+ Dictionary<String> topNColDict;
- public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) {
+ public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo, TblColRef topNCol) {
this.cubeSeg = cubeSeg;
this.cuboid = cuboid;
this.tupleInfo = tupleInfo;
this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
this.rowValueDecoders = rowValueDecoders;
this.derivedColFillers = Lists.newArrayList();
-
+ this.topNCol = topNCol;
+
List<TblColRef> dimCols = cuboid.getColumns();
// pre-calculate dimension index mapping to tuple
@@ -52,6 +66,7 @@ public class CubeTupleConverter {
TblColRef col = dimCols.get(i);
dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
}
+
// pre-calculate metrics index mapping to tuple
metricsMeasureIdx = new int[rowValueDecoders.size()][];
@@ -64,6 +79,7 @@ public class CubeTupleConverter {
metricsTupleIdx[i] = new int[selectedMeasures.cardinality()];
for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) {
FunctionDesc aggrFunc = measures[mi].getFunction();
+
int tupleIdx;
// a rewrite metrics is identified by its rewrite field name
if (aggrFunc.needRewrite()) {
@@ -80,6 +96,13 @@ public class CubeTupleConverter {
}
}
+ if (this.topNCol != null) {
+ this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
+ this.topNMeasureTupleIdx = metricsTupleIdx[0][0];
+
+ this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
+ }
+
// prepare derived columns and filler
Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(dimCols, null);
for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -93,6 +116,46 @@ public class CubeTupleConverter {
}
}
+ public Iterator<Tuple> translateTopNResult(Result hbaseRow, Tuple tuple) {
+ translateResult(hbaseRow, tuple);
+ Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
+ assert (topNCounterObj instanceof TopNCounter);
+ return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
+ }
+
+ private class TopNCounterTupleIterator implements Iterator {
+
+ private Tuple tuple;
+ private Iterator<Counter> topNCounterIterator;
+ private Counter<ByteArray> counter;
+
+ private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
+ this.tuple = tuple;
+ this.topNCounterIterator = topNCounter.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return topNCounterIterator.hasNext();
+ }
+
+ @Override
+ public Tuple next() {
+ counter = topNCounterIterator.next();
+ int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
+ String colValue = topNColDict.getValueFromId(key);
+ tuple.setDimensionValue(topNColTupleIdx, colValue);
+ tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
+
+ return tuple;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
public void translateResult(Result hbaseRow, Tuple tuple) {
try {
byte[] rowkey = hbaseRow.getRow();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index e433b78..831cadb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -58,7 +58,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
private ITuple next;
public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
- Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
+ Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, List<RowValueDecoder> rowValueDecoders, //
StorageContext context, TupleInfo returnTupleInfo) {
this.context = context;
@@ -67,8 +67,13 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
this.segmentIteratorList = new ArrayList<CubeSegmentTupleIterator>(segmentKeyRanges.size());
Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges);
+ boolean useTopN = topNCol != null;
for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) {
- CubeSegmentTupleIterator segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+ CubeSegmentTupleIterator segIter;
+ if (useTopN)
+ segIter = new CubeSegmentTopNTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
+ else
+ segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
this.segmentIteratorList.add(segIter);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
index 0b4fd07..df52664 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.common;
import static org.junit.Assert.*;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -28,6 +29,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITuple;
@@ -142,7 +144,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
int count = 0;
ITupleIterator iterator = null;
try {
- SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+ SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
while (iterator.hasNext()) {
ITuple tuple = iterator.next();