You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/20 01:59:52 UTC
kylin git commit: KYLIN-976 Support multiple implementations for one
aggr function
Repository: kylin
Updated Branches:
refs/heads/2.x-staging 619782ac8 -> 8c2d65808
KYLIN-976 Support multiple implementations for one aggr function
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8c2d6580
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8c2d6580
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8c2d6580
Branch: refs/heads/2.x-staging
Commit: 8c2d658082d8c07194ba9c3a4c152b410df2823e
Parents: 619782a
Author: Yang Li <li...@apache.org>
Authored: Sun Dec 20 08:59:11 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Dec 20 08:59:11 2015 +0800
----------------------------------------------------------------------
.../kylin/measure/MeasureTypeFactory.java | 75 ++++++++++++++--
.../kylin/query/relnode/OLAPAggregateRel.java | 94 +++++++++++++-------
.../apache/kylin/query/routing/QueryRouter.java | 8 --
3 files changed, 132 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/8c2d6580/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
index a5144ec..06645a9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -76,8 +76,8 @@ abstract public class MeasureTypeFactory<T> {
// ============================================================================
- private static Map<String, MeasureTypeFactory<?>> factories = Maps.newHashMap();
- private static MeasureTypeFactory<?> defaultFactory = new BasicMeasureType.Factory();
+ private static Map<String, List<MeasureTypeFactory<?>>> factories = Maps.newHashMap();
+ private static List<MeasureTypeFactory<?>> defaultFactory = Lists.newArrayListWithCapacity(2);
static {
init();
@@ -98,19 +98,28 @@ abstract public class MeasureTypeFactory<T> {
* More MeasureType cannot be configured via kylin.properties alone,
* because used in coprocessor, the new classes must be on classpath
* and be packaged into coprocessor jar. This inevitably involves
- * rebuild Kylin from code and redeploy.
+ * rebuild Kylin from code.
*/
// register factories & data type serializers
for (MeasureTypeFactory<?> factory : factoryInsts) {
- String funcName = factory.getAggrFunctionName().toUpperCase();
+ String funcName = factory.getAggrFunctionName();
+ if (funcName.equals(funcName.toUpperCase()) == false)
+ throw new IllegalArgumentException("Aggregation function name '" + funcName + "' must be in upper case");
String dataTypeName = factory.getAggrDataTypeName().toLowerCase();
+ if (dataTypeName.equals(dataTypeName.toLowerCase()) == false)
+ throw new IllegalArgumentException("Aggregation data type name '" + dataTypeName + "' must be in lower case");
Class<? extends DataTypeSerializer<?>> serializer = factory.getAggrDataTypeSerializer();
DataType.register(dataTypeName);
DataTypeSerializer.register(dataTypeName, serializer);
- factories.put(funcName, factory);
+ List<MeasureTypeFactory<?>> list = factories.get(funcName);
+ if (list == null)
+ factories.put(funcName, list = Lists.newArrayListWithCapacity(2));
+ list.add(factory);
}
+
+ defaultFactory.add(new BasicMeasureType.Factory());
}
public static MeasureType<?> create(String funcName, String dataType) {
@@ -120,10 +129,62 @@ abstract public class MeasureTypeFactory<T> {
public static MeasureType<?> create(String funcName, DataType dataType) {
funcName = funcName.toUpperCase();
- MeasureTypeFactory<?> factory = factories.get(funcName);
+ List<MeasureTypeFactory<?>> factory = factories.get(funcName);
if (factory == null)
factory = defaultFactory;
- return factory.createMeasureType(funcName, dataType);
+ // a special case where in early stage of sql parsing, the data type is unknown; only needRewrite() is required at that stage
+ if (dataType == null) {
+ return new NeedRewriteOnlyMeasureType(funcName, factory);
+ }
+
+ // the normal case, only one factory for a function
+ if (factory.size() == 1) {
+ return factory.get(0).createMeasureType(funcName, dataType);
+ }
+
+ // sometimes multiple factories are registered for the same function, then data types must tell them apart
+ for (MeasureTypeFactory<?> f : factory) {
+ if (f.getAggrDataTypeName().equals(dataType.getName()))
+ return f.createMeasureType(funcName, dataType);
+ }
+ throw new IllegalStateException();
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static class NeedRewriteOnlyMeasureType extends MeasureType {
+
+ private Boolean needRewrite;
+
+ public NeedRewriteOnlyMeasureType(String funcName, List<MeasureTypeFactory<?>> factory) {
+ for (MeasureTypeFactory<?> f : factory) {
+ boolean b = f.createMeasureType(funcName, null).needRewrite();
+ if (needRewrite == null)
+ needRewrite = Boolean.valueOf(b);
+ else if (needRewrite.booleanValue() != b)
+ throw new IllegalStateException("needRewrite() of factorys " + factory + " does not have consensus");
+ }
+ }
+
+ @Override
+ public MeasureIngester newIngester() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MeasureAggregator newAggregator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean needRewrite() {
+ return needRewrite;
+ }
+
+ @Override
+ public Class getRewriteCalciteAggrFunctionClass() {
+ throw new UnsupportedOperationException();
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8c2d6580/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 0c53703..eed5636 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -56,11 +56,13 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
/**
*/
@@ -133,11 +135,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
this.columnRowType = buildColumnRowType();
this.afterAggregate = this.context.afterAggregate;
- // only translate the first aggregation
+ // only translate the innermost aggregation
if (!this.afterAggregate) {
translateGroupBy();
- fillbackOptimizedColumn();
- translateAggregation();
+ this.context.aggregations.addAll(this.aggregations);
this.context.afterAggregate = true;
} else {
for (AggregateCall aggCall : aggCalls) {
@@ -224,14 +225,72 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
context.groupByColumns.addAll(this.groups);
}
+ @Override
+ public void implementRewrite(RewriteImplementor implementor) {
+ // only rewrite the innermost aggregation
+ if (!this.afterAggregate) {
+ translateAggregation();
+ buildRewriteFieldsAndMetricsColumns();
+ }
+
+ implementor.visitChild(this, getInput());
+
+ // only rewrite the innermost aggregation
+ if (!this.afterAggregate && RewriteImplementor.needRewrite(this.context)) {
+ // rewrite the aggCalls
+ this.rewriteAggCalls = new ArrayList<AggregateCall>(aggCalls.size());
+ for (int i = 0; i < this.aggCalls.size(); i++) {
+ AggregateCall aggCall = this.aggCalls.get(i);
+ FunctionDesc cubeFunc = this.context.aggregations.get(i);
+ if (cubeFunc.needRewrite()) {
+ aggCall = rewriteAggregateCall(aggCall, cubeFunc);
+ }
+ this.rewriteAggCalls.add(aggCall);
+ }
+ }
+
+ // rebuild rowType & columnRowType
+ this.rowType = this.deriveRowType();
+ this.columnRowType = this.buildColumnRowType();
+
+ }
+
private void translateAggregation() {
+ // now the realization is known, replace aggregations with what's defined on MeasureDesc
+ List<MeasureDesc> measures = this.context.realization.getMeasures();
+ List<FunctionDesc> newAggrs = Lists.newArrayList();
+ for (FunctionDesc aggFunc : this.aggregations) {
+ newAggrs.add(findInMeasures(aggFunc, measures));
+ }
+ this.aggregations.clear();
+ this.aggregations.addAll(newAggrs);
+ this.context.aggregations.clear();
+ this.context.aggregations.addAll(newAggrs);
+ }
+
+ private FunctionDesc findInMeasures(FunctionDesc aggFunc, List<MeasureDesc> measures) {
+ for (MeasureDesc m : measures) {
+ if (aggFunc.equals(m.getFunction()))
+ return m.getFunction();
+ }
+ return aggFunc;
+ }
+
+ private void buildRewriteFieldsAndMetricsColumns() {
+ fillbackOptimizedColumn();
+
ColumnRowType inputColumnRowType = ((OLAPRel) getInput()).getColumnRowType();
for (int i = 0; i < this.aggregations.size(); i++) {
FunctionDesc aggFunc = this.aggregations.get(i);
- context.aggregations.add(aggFunc);
+
+ if (aggFunc.isDimensionAsMetric()) {
+ this.context.groupByColumns.addAll(aggFunc.getParameter().getColRefs());
+ continue; // skip rewrite, let calcite handle
+ }
+
if (aggFunc.needRewrite()) {
String rewriteFieldName = aggFunc.getRewriteFieldName();
- context.rewriteFields.put(rewriteFieldName, null);
+ this.context.rewriteFields.put(rewriteFieldName, null);
TblColRef column = buildRewriteColumn(aggFunc);
this.context.metricsColumns.add(column);
@@ -263,31 +322,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
}
}
- @Override
- public void implementRewrite(RewriteImplementor implementor) {
- implementor.visitChild(this, getInput());
-
- // only rewrite the first aggregation
- if (!this.afterAggregate && RewriteImplementor.needRewrite(this.context)) {
- // rewrite the aggCalls
- this.rewriteAggCalls = new ArrayList<AggregateCall>(aggCalls.size());
- for (int i = 0; i < this.aggCalls.size(); i++) {
- AggregateCall aggCall = this.aggCalls.get(i);
- FunctionDesc cubeFunc = this.context.aggregations.get(i);
- if (cubeFunc.needRewrite()) {
- aggCall = rewriteAggregateCall(aggCall, cubeFunc);
- }
- this.rewriteAggCalls.add(aggCall);
- }
- }
-
- // rebuild rowType & columnRowType
- //ClassUtil.updateFinalField(Aggregate.class, "aggCalls", this, rewriteAggCalls);
- this.rowType = this.deriveRowType(); // this does not work coz super.aggCalls is final
- this.columnRowType = this.buildColumnRowType();
-
- }
-
private AggregateCall rewriteAggregateCall(AggregateCall aggCall, FunctionDesc func) {
// rebuild parameters
http://git-wip-us.apache.org/repos/asf/kylin/blob/8c2d6580/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
index 7493e08..c4d0fbe 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
@@ -23,7 +23,6 @@ import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.realization.CapabilityResult;
import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
@@ -84,13 +83,6 @@ public class QueryRouter {
if (inf instanceof DimensionAsMeasure) {
FunctionDesc functionDesc = ((DimensionAsMeasure) inf).getMeasureFunction();
functionDesc.setDimensionAsMetric(true);
- olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
- for (TblColRef col : functionDesc.getParameter().getColRefs()) {
- if (col != null) {
- olapContext.metricsColumns.remove(col);
- olapContext.groupByColumns.add(col);
- }
- }
logger.info("Adjust DimensionAsMeasure for " + functionDesc);
}
}