You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/19 03:05:40 UTC
[27/50] [abbrv] kylin git commit: KYLIN-2088 Refactor MeasureType to
allow mutliple UDAF defined on a measure type
KYLIN-2088 Refactor MeasureType to allow mutliple UDAF defined on a measure type
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0018a212
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0018a212
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0018a212
Branch: refs/heads/master-cdh5.7
Commit: 0018a212470bfe1e8edd3603d383da80f2ffd322
Parents: b4c970a
Author: Yang Li <li...@apache.org>
Authored: Sun Oct 16 23:44:49 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Oct 16 23:44:49 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 1 -
.../org/apache/kylin/measure/MeasureType.java | 16 +++--
.../kylin/measure/MeasureTypeFactory.java | 46 +++++++++++---
.../kylin/measure/basic/BasicMeasureType.java | 6 --
.../kylin/measure/bitmap/BitmapMeasureType.java | 43 ++++++++-----
.../dim/DimCountDistinctMeasureType.java | 10 ++-
.../ExtendedColumnMeasureType.java | 6 +-
.../kylin/measure/hllc/HLLCMeasureType.java | 10 ++-
.../kylin/measure/raw/RawMeasureType.java | 6 +-
.../kylin/measure/topn/TopNMeasureType.java | 5 --
.../kylin/metadata/realization/SQLDigest.java | 43 ++++++++-----
.../kylin/engine/mr/steps/CubeReducerTest.java | 5 +-
.../kylin/storage/hbase/ITStorageTest.java | 3 +-
.../kylin/query/relnode/ColumnRowType.java | 7 +++
.../kylin/query/relnode/OLAPAggregateRel.java | 66 +++++++++++---------
.../apache/kylin/query/relnode/OLAPContext.java | 15 +++--
.../kylin/query/schema/OLAPSchemaFactory.java | 13 +++-
.../cube/MeasureTypeOnlyAggrInBaseTest.java | 5 --
18 files changed, 187 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7dacd06..4942081 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -766,7 +766,6 @@ abstract public class KylinConfigBase implements Serializable {
public Map<String, String> getUDFs() {
Map<String, String> udfMap = getPropertiesByPrefix("kylin.query.udf.");
- udfMap.put("intersect_count", "org.apache.kylin.measure.bitmap.BitmapIntersectDistinctCountAggFunc");
return udfMap;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index e7312f2..de1b442 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -102,22 +102,20 @@ abstract public class MeasureType<T> {
* Query Rewrite
* ---------------------------------------------------------------------------- */
- // TODO support user defined Calcite aggr function
-
/** Whether or not Calcite rel-tree needs rewrite to do last around of aggregation */
abstract public boolean needRewrite();
- /** Does the rewrite involves an extra field for the pre-calculated */
+ /** Does the rewrite involves an extra field to hold the pre-calculated */
public boolean needRewriteField() {
return true;
}
- /** Returns a Calcite aggregation function implementation class */
- abstract public Class<?> getRewriteCalciteAggrFunctionClass();
-
- /** Some measure may return different class depends on call name, eg. BitmapMeasureType */
- public Class<?> getRewriteCalciteAggrFunctionClass(String callName) {
- return getRewriteCalciteAggrFunctionClass();
+ /**
+ * Returns a map from UDAF to Calcite aggregation function implementation class.
+ * There can be zero or more UDAF defined on a measure type.
+ */
+ public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+ return null;
}
/* ============================================================================
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
index 17d841a..c5bd482 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -32,6 +32,7 @@ import org.apache.kylin.measure.raw.RawMeasureType;
import org.apache.kylin.measure.topn.TopNMeasureType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,8 +87,10 @@ abstract public class MeasureTypeFactory<T> {
// ============================================================================
- private static Map<String, List<MeasureTypeFactory<?>>> factories = Maps.newHashMap();
- private static List<MeasureTypeFactory<?>> defaultFactory = Lists.newArrayListWithCapacity(2);
+ final private static Map<String, List<MeasureTypeFactory<?>>> factories = Maps.newHashMap();
+ final private static Map<String, Class<?>> udafMap = Maps.newHashMap(); // a map from UDAF to Calcite aggregation function implementation class
+ final private static Map<String, MeasureTypeFactory> udafFactories = Maps.newHashMap(); // a map from UDAF to its owner factory
+ final private static List<MeasureTypeFactory<?>> defaultFactory = Lists.newArrayListWithCapacity(2);
static {
init();
@@ -110,7 +113,8 @@ abstract public class MeasureTypeFactory<T> {
logger.info("Checking custom measure types from kylin config");
try {
- for (String customFactory : KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes().values()) {
+ Map<String, String> customMeasureTypes = KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes();
+ for (String customFactory : customMeasureTypes.values()) {
try {
logger.info("Checking custom measure types from kylin config: " + customFactory);
factoryInsts.add((MeasureTypeFactory<?>) Class.forName(customFactory).newInstance());
@@ -132,9 +136,10 @@ abstract public class MeasureTypeFactory<T> {
throw new IllegalArgumentException("Aggregation data type name '" + dataTypeName + "' must be in lower case");
Class<? extends DataTypeSerializer<?>> serializer = factory.getAggrDataTypeSerializer();
- logger.info("registering " + dataTypeName);
+ logger.info("registering " + funcName + "(" + dataTypeName + "), " + factory.getClass());
DataType.register(dataTypeName);
DataTypeSerializer.register(dataTypeName, serializer);
+ registerUDAF(factory);
List<MeasureTypeFactory<?>> list = factories.get(funcName);
if (list == null)
factories.put(funcName, list = Lists.newArrayListWithCapacity(2));
@@ -144,13 +149,40 @@ abstract public class MeasureTypeFactory<T> {
defaultFactory.add(new BasicMeasureType.Factory());
}
+ private static void registerUDAF(MeasureTypeFactory<?> factory) {
+ MeasureType<?> type = factory.createMeasureType(factory.getAggrFunctionName(), DataType.getType(factory.getAggrDataTypeName()));
+ Map<String, Class<?>> udafs = type.getRewriteCalciteAggrFunctions();
+ if (type.needRewrite() == false || udafs == null)
+ return;
+
+ for (String udaf : udafs.keySet()) {
+ udaf = udaf.toUpperCase();
+ if (udaf.equals(FunctionDesc.FUNC_COUNT_DISTINCT))
+ continue; // skip built-in function
+
+ if (udafFactories.containsKey(udaf))
+ throw new IllegalStateException("UDAF '" + udaf + "' was dup declared by " + udafFactories.get(udaf) + " and " + factory);
+
+ udafFactories.put(udaf, factory);
+ udafMap.put(udaf, udafs.get(udaf));
+ }
+ }
+
+ public static Map<String, Class<?>> getUDAFs() {
+ return udafMap;
+ }
+
+ public static Map<String, MeasureTypeFactory> getUDAFFactories() {
+ return udafFactories;
+ }
+
public static MeasureType<?> create(String funcName, String dataType) {
return create(funcName, DataType.getType(dataType));
}
public static MeasureType<?> createNoRewriteFieldsMeasureType(String funcName, DataType dataType) {
// currently only has DimCountDistinctAgg
- if (funcName.equalsIgnoreCase("COUNT_DISTINCT")) {
+ if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_COUNT_DISTINCT)) {
return new DimCountDistinctMeasureType.DimCountDistinctMeasureTypeFactory().createMeasureType(funcName, dataType);
}
@@ -213,8 +245,8 @@ abstract public class MeasureTypeFactory<T> {
}
@Override
- public Class getRewriteCalciteAggrFunctionClass() {
- throw new UnsupportedOperationException();
+ public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index 4ab4584..ed493a1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -143,10 +143,4 @@ public class BasicMeasureType extends MeasureType {
public boolean needRewrite() {
return !isSum();
}
-
- @Override
- public Class<?> getRewriteCalciteAggrFunctionClass() {
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index 2b88e21..8e2b2f7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -33,12 +33,16 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigest.SQLCall;
+
+import com.google.common.collect.ImmutableMap;
/**
* Created by sunyerui on 15/12/10.
*/
public class BitmapMeasureType extends MeasureType<BitmapCounter> {
- public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+ public static final String FUNC_COUNT_DISTINCT = FunctionDesc.FUNC_COUNT_DISTINCT;
public static final String FUNC_INTERSECT_COUNT_DISTINCT = "INTERSECT_COUNT";
public static final String DATATYPE_BITMAP = "bitmap";
@@ -151,30 +155,37 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
}
}
- @Override
- public boolean needRewrite() {
+ // In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary
+ private boolean needDictionaryColumn(FunctionDesc functionDesc) {
+ DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType();
+ if (dataType.isIntegerFamily() && !dataType.isBigInt()) {
+ return false;
+ }
return true;
}
@Override
- public Class<?> getRewriteCalciteAggrFunctionClass() {
- return BitmapDistinctCountAggFunc.class;
+ public boolean needRewrite() {
+ return true;
}
+ static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(//
+ FUNC_COUNT_DISTINCT, BitmapDistinctCountAggFunc.class, //
+ FUNC_INTERSECT_COUNT_DISTINCT, BitmapIntersectDistinctCountAggFunc.class);
+
@Override
- public Class<?> getRewriteCalciteAggrFunctionClass(String callName) {
- if (callName != null && callName.equalsIgnoreCase(FUNC_INTERSECT_COUNT_DISTINCT)) {
- return BitmapIntersectDistinctCountAggFunc.class;
- }
- return BitmapDistinctCountAggFunc.class;
+ public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+ return UDAF_MAP;
}
- // In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary
- private boolean needDictionaryColumn(FunctionDesc functionDesc) {
- DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType();
- if (dataType.isIntegerFamily() && !dataType.isBigInt()) {
- return false;
+ @Override
+ public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) {
+ for (SQLCall call : sqlDigest.aggrSqlCalls) {
+ if (FUNC_INTERSECT_COUNT_DISTINCT.equals(call.function)) {
+ TblColRef col = (TblColRef) call.args.get(1);
+ if (!sqlDigest.groupbyColumns.contains(col))
+ sqlDigest.groupbyColumns.add(col);
+ }
}
- return true;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
index 9fe1075..0b3fd94 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
@@ -19,15 +19,19 @@
package org.apache.kylin.measure.dim;
import java.util.List;
+import java.util.Map;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.MeasureTypeFactory;
import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.realization.SQLDigest;
+import com.google.common.collect.ImmutableMap;
+
/**
* Created by dongli on 4/20/16.
*/
@@ -76,9 +80,11 @@ public class DimCountDistinctMeasureType extends MeasureType<Object> {
return false;
}
+ static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FunctionDesc.FUNC_COUNT_DISTINCT, DimCountDistinctAggFunc.class);
+
@Override
- public Class<?> getRewriteCalciteAggrFunctionClass() {
- return DimCountDistinctAggFunc.class;
+ public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+ return UDAF_MAP;
}
public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
index 796f1f7..c8f01ad 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
@@ -227,6 +227,7 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> {
};
}
+ @SuppressWarnings("serial")
@Override
public MeasureAggregator<ByteArray> newAggregator() {
return new MeasureAggregator<ByteArray>() {
@@ -268,9 +269,4 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> {
public boolean needRewrite() {
return false;
}
-
- @Override
- public Class<?> getRewriteCalciteAggrFunctionClass() {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index bd5013e..0e58dca 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -31,9 +31,11 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import com.google.common.collect.ImmutableMap;
+
public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> {
- public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+ public static final String FUNC_COUNT_DISTINCT = FunctionDesc.FUNC_COUNT_DISTINCT;
public static final String DATATYPE_HLLC = "hllc";
public static class Factory extends MeasureTypeFactory<HyperLogLogPlusCounter> {
@@ -116,9 +118,11 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> {
return true;
}
+ static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FUNC_COUNT_DISTINCT, HLLDistinctCountAggFunc.class);
+
@Override
- public Class<?> getRewriteCalciteAggrFunctionClass() {
- return HLLDistinctCountAggFunc.class;
+ public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+ return UDAF_MAP;
}
public static boolean isCountDistinct(FunctionDesc func) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java
index 50715ec..3a49d31 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java
@@ -75,6 +75,7 @@ public class RawMeasureType extends MeasureType<List<ByteArray>> {
}
}
+ @SuppressWarnings("unused")
private final DataType dataType;
public RawMeasureType(String funcName, DataType dataType) {
@@ -191,11 +192,6 @@ public class RawMeasureType extends MeasureType<List<ByteArray>> {
}
@Override
- public Class<?> getRewriteCalciteAggrFunctionClass() {
- return null;
- }
-
- @Override
public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) {
if (sqlDigest.isRawQuery) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 800ca88..33ab314 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -288,11 +288,6 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
}
@Override
- public Class<?> getRewriteCalciteAggrFunctionClass() {
- return null;
- }
-
- @Override
public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) {
for (MeasureDesc measureDesc : measureDescs) {
FunctionDesc topnFunc = measureDesc.getFunction();
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
index d2bba66..4887abb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
@@ -18,7 +18,8 @@
package org.apache.kylin.metadata.realization;
-import java.util.Collection;
+import java.util.List;
+import java.util.Set;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -26,6 +27,8 @@ import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import com.google.common.collect.ImmutableList;
+
/**
*/
public class SQLDigest {
@@ -34,29 +37,41 @@ public class SQLDigest {
ASCENDING, DESCENDING
}
+ public static class SQLCall {
+ public final String function;
+ public final List<Object> args;
+
+ public SQLCall(String function, Iterable<Object> args) {
+ this.function = function;
+ this.args = ImmutableList.copyOf(args);
+ }
+ }
+
public String factTable;
public TupleFilter filter;
- public Collection<JoinDesc> joinDescs;
- public Collection<TblColRef> allColumns;
- public Collection<TblColRef> groupbyColumns;
- public Collection<TblColRef> filterColumns;
- public Collection<TblColRef> metricColumns;
- public Collection<FunctionDesc> aggregations;
- public Collection<MeasureDesc> sortMeasures;
- public Collection<OrderEnum> sortOrders;
+ public List<JoinDesc> joinDescs;
+ public Set<TblColRef> allColumns;
+ public List<TblColRef> groupbyColumns;
+ public Set<TblColRef> filterColumns;
+ public Set<TblColRef> metricColumns;
+ public List<FunctionDesc> aggregations; // storage level measure type, on top of which various sql aggr function may apply
+ public List<SQLCall> aggrSqlCalls; // sql level aggregation function call
+ public List<MeasureDesc> sortMeasures;
+ public List<OrderEnum> sortOrders;
public boolean isRawQuery;
- //initialized when org.apache.kylin.query.routing.QueryRouter.selectRealization()
- public SQLDigest(String factTable, TupleFilter filter, Collection<JoinDesc> joinDescs, Collection<TblColRef> allColumns, //
- Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc, Collection<MeasureDesc> sortMeasures, Collection<OrderEnum> sortOrders) {
+ public SQLDigest(String factTable, TupleFilter filter, List<JoinDesc> joinDescs, Set<TblColRef> allColumns, //
+ List<TblColRef> groupbyColumns, Set<TblColRef> filterColumns, Set<TblColRef> metricColumns, //
+ List<FunctionDesc> aggregations, List<SQLCall> aggrSqlCalls, List<MeasureDesc> sortMeasures, List<OrderEnum> sortOrders) {
this.factTable = factTable;
this.filter = filter;
this.joinDescs = joinDescs;
this.allColumns = allColumns;
this.groupbyColumns = groupbyColumns;
this.filterColumns = filterColumns;
- this.metricColumns = aggregatedColumns;
- this.aggregations = aggregateFunnc;
+ this.metricColumns = metricColumns;
+ this.aggregations = aggregations;
+ this.aggrSqlCalls = aggrSqlCalls;
this.sortMeasures = sortMeasures;
this.sortOrders = sortOrders;
this.isRawQuery = isRawQuery();
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
index 5e687a5..97dd750 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
@@ -27,6 +27,7 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
@@ -197,8 +198,8 @@ public class CubeReducerTest extends LocalFileMetadataTestCase {
}
@Override
- public Class<?> getRewriteCalciteAggrFunctionClass() {
- return origMeasureType.getRewriteCalciteAggrFunctionClass();
+ public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+ return origMeasureType.getRewriteCalciteAggrFunctions();
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index 136342d..4121c02 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -33,6 +33,7 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigest.SQLCall;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.storage.IStorageQuery;
@@ -144,7 +145,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
int count = 0;
ITupleIterator iterator = null;
try {
- SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
+ SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, Collections.<SQLCall> emptyList(), new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
while (iterator.hasNext()) {
ITuple tuple = iterator.next();
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
index 095b5e2..f2894e7 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
@@ -49,6 +49,13 @@ public class ColumnRowType {
return columns.get(index);
}
+ public TblColRef getColumnByIndexNullable(int index) {
+ if (index < 0 || index >= columns.size())
+ return null;
+ else
+ return columns.get(index);
+ }
+
public int getIndexByName(String columnName) {
for (int i = 0; i < columns.size(); i++) {
if (columns.get(i).getName().equals(columnName)) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 8ecb808..9c4f287 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -55,14 +55,14 @@ import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.measure.bitmap.BitmapMeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest.SQLCall;
import org.apache.kylin.query.schema.OLAPTable;
import com.google.common.base.Preconditions;
@@ -80,23 +80,28 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
AGGR_FUNC_MAP.put("$SUM0", "SUM");
AGGR_FUNC_MAP.put("COUNT", "COUNT");
AGGR_FUNC_MAP.put("COUNT_DISTINCT", "COUNT_DISTINCT");
- AGGR_FUNC_MAP.put(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT, "COUNT_DISTINCT");
AGGR_FUNC_MAP.put("MAX", "MAX");
AGGR_FUNC_MAP.put("MIN", "MIN");
- for (String customAggrFunc : KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes().keySet()) {
- AGGR_FUNC_MAP.put(customAggrFunc.trim().toUpperCase(), customAggrFunc.trim().toUpperCase());
+ Map<String, MeasureTypeFactory> udafFactories = MeasureTypeFactory.getUDAFFactories();
+ for (String udaf : udafFactories.keySet()) {
+ AGGR_FUNC_MAP.put(udaf, udafFactories.get(udaf).getAggrFunctionName());
}
}
- private static String getFuncName(AggregateCall aggCall) {
- String aggName = aggCall.getAggregation().getName();
+ private static String getSqlFuncName(AggregateCall aggCall) {
+ String sqlName = aggCall.getAggregation().getName();
if (aggCall.isDistinct()) {
- aggName = aggName + "_DISTINCT";
+ sqlName = sqlName + "_DISTINCT";
}
- String funcName = AGGR_FUNC_MAP.get(aggName);
+ return sqlName;
+ }
+
+ private static String getAggrFuncName(AggregateCall aggCall) {
+ String sqlName = getSqlFuncName(aggCall);
+ String funcName = AGGR_FUNC_MAP.get(sqlName);
if (funcName == null) {
- throw new IllegalStateException("Don't suppoprt aggregation " + aggName);
+ throw new IllegalStateException("Don't suppoprt aggregation " + sqlName);
}
return funcName;
}
@@ -151,7 +156,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
// only translate the innermost aggregation
if (!this.afterAggregate) {
- translateGroupBy();
+ this.context.groupByColumns.addAll(this.groups);
this.context.aggregations.addAll(this.aggregations);
this.context.afterAggregate = true;
} else {
@@ -226,15 +231,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
Set<TblColRef> columns = inputColumnRowType.getSourceColumnsByIndex(i);
this.groups.addAll(columns);
}
- // Some UDAF may group data by itself, add group key into groups, prevents aggregate at cube storage server side
- for (AggregateCall aggCall : this.rewriteAggCalls) {
- String aggregateName = aggCall.getAggregation().getName();
- if (aggregateName.equalsIgnoreCase(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT)) {
- int index = aggCall.getArgList().get(1);
- TblColRef column = inputColumnRowType.getColumnByIndex(index);
- groups.add(column);
- }
- }
}
private void buildAggregations() {
@@ -254,17 +250,13 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
}
}
FunctionDesc aggFunc = new FunctionDesc();
- String funcName = getFuncName(aggCall);
+ String funcName = getAggrFuncName(aggCall);
aggFunc.setExpression(funcName);
aggFunc.setParameter(parameter);
this.aggregations.add(aggFunc);
}
}
- private void translateGroupBy() {
- context.groupByColumns.addAll(this.groups);
- }
-
@Override
public void implementRewrite(RewriteImplementor implementor) {
// only rewrite the innermost aggregation
@@ -286,6 +278,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
aggCall = rewriteAggregateCall(aggCall, cubeFunc);
}
this.rewriteAggCalls.add(aggCall);
+ this.context.aggrSqlCalls.add(toSqlCall(aggCall));
}
}
@@ -295,6 +288,18 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
}
+ private SQLCall toSqlCall(AggregateCall aggCall) {
+ ColumnRowType inputColumnRowType = ((OLAPRel) getInput()).getColumnRowType();
+
+ String function = getSqlFuncName(aggCall);
+ List<Object> args = Lists.newArrayList();
+ for (Integer index : aggCall.getArgList()) {
+ TblColRef col = inputColumnRowType.getColumnByIndexNullable(index);
+ args.add(col);
+ }
+ return new SQLCall(function, args);
+ }
+
private void translateAggregation() {
// now the realization is known, replace aggregations with what's defined on MeasureDesc
List<MeasureDesc> measures = this.context.realization.getMeasures();
@@ -371,8 +376,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
private AggregateCall rewriteAggregateCall(AggregateCall aggCall, FunctionDesc func) {
- //if it's not a cube, then the "needRewriteField func" should not resort to any rewrite fields,
- // which do not exist at all
+ // if it's not a cube, then the "needRewriteField func" should not resort to any rewrite fields, which do not exist at all
if (noPrecaculatedFieldsAvailable() && func.needRewriteField()) {
logger.info(func + "skip rewriteAggregateCall because no pre-aggregated field available");
return aggCall;
@@ -391,16 +395,18 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
}
// rebuild function
- String callName = aggCall.getAggregation().getName();
+ String callName = getSqlFuncName(aggCall);
RelDataType fieldType = aggCall.getType();
SqlAggFunction newAgg = aggCall.getAggregation();
+ Map<String, Class<?>> udafMap = func.getMeasureType().getRewriteCalciteAggrFunctions();
if (func.isCount()) {
newAgg = SqlStdOperatorTable.SUM0;
- } else if (func.getMeasureType().getRewriteCalciteAggrFunctionClass() != null) {
- newAgg = createCustomAggFunction(callName, fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass(callName));
+ } else if (udafMap != null && udafMap.containsKey(callName)) {
+ newAgg = createCustomAggFunction(callName, fieldType, udafMap.get(callName));
}
// rebuild aggregate call
+ @SuppressWarnings("deprecation")
AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName);
return newAggCall;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 41a3b4d..cdec665 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -36,6 +37,7 @@ import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigest.SQLCall;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.query.schema.OLAPSchema;
import org.apache.kylin.storage.StorageContext;
@@ -116,11 +118,12 @@ public class OLAPContext {
// cube metadata
public IRealization realization;
- public Collection<TblColRef> allColumns = new HashSet<TblColRef>();
- public Collection<TblColRef> groupByColumns = new ArrayList<TblColRef>();
- public Collection<TblColRef> metricsColumns = new HashSet<TblColRef>();
- public List<FunctionDesc> aggregations = new ArrayList<FunctionDesc>();
- public Collection<TblColRef> filterColumns = new HashSet<TblColRef>();
+ public Set<TblColRef> allColumns = new HashSet<TblColRef>();
+ public List<TblColRef> groupByColumns = new ArrayList<TblColRef>();
+ public Set<TblColRef> metricsColumns = new HashSet<TblColRef>();
+ public List<FunctionDesc> aggregations = new ArrayList<FunctionDesc>(); // storage level measure type, on top of which various sql aggr function may apply
+ public List<SQLCall> aggrSqlCalls = new ArrayList<SQLCall>(); // sql level aggregation function call
+ public Set<TblColRef> filterColumns = new HashSet<TblColRef>();
public TupleFilter filter;
public List<JoinDesc> joins = new LinkedList<JoinDesc>();
private List<MeasureDesc> sortMeasures;
@@ -144,7 +147,7 @@ public class OLAPContext {
public SQLDigest getSQLDigest() {
if (sqlDigest == null)
- sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, sortMeasures, sortOrders);
+ sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, aggrSqlCalls, sortMeasures, sortOrders);
return sqlDigest;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
index e42d9be..93f06dd 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
@@ -25,6 +25,7 @@ import java.io.Writer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.calcite.schema.Schema;
@@ -34,6 +35,7 @@ import org.apache.calcite.util.ConversionUtil;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.measure.MeasureTypeFactory;
import org.apache.kylin.metadata.model.DatabaseDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
@@ -41,6 +43,8 @@ import org.apache.kylin.metadata.project.ProjectManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+
/**
*/
public class OLAPSchemaFactory implements SchemaFactory {
@@ -138,9 +142,14 @@ public class OLAPSchemaFactory implements SchemaFactory {
}
private static void createOLAPSchemaFunctions(Writer out) throws IOException {
- out.write(" \"functions\": [\n");
- Map<String, String> udfs = KylinConfig.getInstanceFromEnv().getUDFs();
+ Map<String, String> udfs = Maps.newHashMap();
+ udfs.putAll(KylinConfig.getInstanceFromEnv().getUDFs());
+ for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) {
+ udfs.put(entry.getKey(), entry.getValue().getName());
+ }
+
int index = 0;
+ out.write(" \"functions\": [\n");
for (Map.Entry<String, String> udf : udfs.entrySet()) {
String udfName = udf.getKey().trim().toUpperCase();
String udfClassName = udf.getValue().trim();
http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java
index 1353862..e07ea91 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java
@@ -137,11 +137,6 @@ public class MeasureTypeOnlyAggrInBaseTest extends LocalFileMetadataTestCase {
}
@Override
- public Class<?> getRewriteCalciteAggrFunctionClass() {
- return null;
- }
-
- @Override
public boolean onlyAggrInBaseCuboid() {
return true;
}