You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/20 01:59:52 UTC

kylin git commit: KYLIN-976 Support multiple implementations for one aggr function

Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 619782ac8 -> 8c2d65808


KYLIN-976 Support multiple implementations for one aggr function


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8c2d6580
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8c2d6580
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8c2d6580

Branch: refs/heads/2.x-staging
Commit: 8c2d658082d8c07194ba9c3a4c152b410df2823e
Parents: 619782a
Author: Yang Li <li...@apache.org>
Authored: Sun Dec 20 08:59:11 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Dec 20 08:59:11 2015 +0800

----------------------------------------------------------------------
 .../kylin/measure/MeasureTypeFactory.java       | 75 ++++++++++++++--
 .../kylin/query/relnode/OLAPAggregateRel.java   | 94 +++++++++++++-------
 .../apache/kylin/query/routing/QueryRouter.java |  8 --
 3 files changed, 132 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8c2d6580/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
index a5144ec..06645a9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -76,8 +76,8 @@ abstract public class MeasureTypeFactory<T> {
 
     // ============================================================================
 
-    private static Map<String, MeasureTypeFactory<?>> factories = Maps.newHashMap();
-    private static MeasureTypeFactory<?> defaultFactory = new BasicMeasureType.Factory();
+    private static Map<String, List<MeasureTypeFactory<?>>> factories = Maps.newHashMap();
+    private static List<MeasureTypeFactory<?>> defaultFactory = Lists.newArrayListWithCapacity(2);
 
     static {
         init();
@@ -98,19 +98,28 @@ abstract public class MeasureTypeFactory<T> {
          * More MeasureType cannot be configured via kylin.properties alone,
          * because used in coprocessor, the new classes must be on classpath
          * and be packaged into coprocessor jar. This inevitably involves
-         * rebuild Kylin from code and redeploy.
+         * rebuild Kylin from code.
          */
 
         // register factories & data type serializers
         for (MeasureTypeFactory<?> factory : factoryInsts) {
-            String funcName = factory.getAggrFunctionName().toUpperCase();
+            String funcName = factory.getAggrFunctionName();
+            if (funcName.equals(funcName.toUpperCase()) == false)
+                throw new IllegalArgumentException("Aggregation function name '" + funcName + "' must be in upper case");
             String dataTypeName = factory.getAggrDataTypeName().toLowerCase();
+            if (dataTypeName.equals(dataTypeName.toLowerCase()) == false)
+                throw new IllegalArgumentException("Aggregation data type name '" + dataTypeName + "' must be in lower case");
             Class<? extends DataTypeSerializer<?>> serializer = factory.getAggrDataTypeSerializer();
 
             DataType.register(dataTypeName);
             DataTypeSerializer.register(dataTypeName, serializer);
-            factories.put(funcName, factory);
+            List<MeasureTypeFactory<?>> list = factories.get(funcName);
+            if (list == null)
+                factories.put(funcName, list = Lists.newArrayListWithCapacity(2));
+            list.add(factory);
         }
+
+        defaultFactory.add(new BasicMeasureType.Factory());
     }
 
     public static MeasureType<?> create(String funcName, String dataType) {
@@ -120,10 +129,62 @@ abstract public class MeasureTypeFactory<T> {
     public static MeasureType<?> create(String funcName, DataType dataType) {
         funcName = funcName.toUpperCase();
 
-        MeasureTypeFactory<?> factory = factories.get(funcName);
+        List<MeasureTypeFactory<?>> factory = factories.get(funcName);
         if (factory == null)
             factory = defaultFactory;
 
-        return factory.createMeasureType(funcName, dataType);
+        // a special case where in early stage of sql parsing, the data type is unknown; only needRewrite() is required at that stage
+        if (dataType == null) {
+            return new NeedRewriteOnlyMeasureType(funcName, factory);
+        }
+
+        // the normal case, only one factory for a function
+        if (factory.size() == 1) {
+            return factory.get(0).createMeasureType(funcName, dataType);
+        }
+
+        // sometimes multiple factories are registered for the same function, then data types must tell them apart
+        for (MeasureTypeFactory<?> f : factory) {
+            if (f.getAggrDataTypeName().equals(dataType.getName()))
+                return f.createMeasureType(funcName, dataType);
+        }
+        throw new IllegalStateException();
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static class NeedRewriteOnlyMeasureType extends MeasureType {
+
+        private Boolean needRewrite;
+
+        public NeedRewriteOnlyMeasureType(String funcName, List<MeasureTypeFactory<?>> factory) {
+            for (MeasureTypeFactory<?> f : factory) {
+                boolean b = f.createMeasureType(funcName, null).needRewrite();
+                if (needRewrite == null)
+                    needRewrite = Boolean.valueOf(b);
+                else if (needRewrite.booleanValue() != b)
+                    throw new IllegalStateException("needRewrite() of factorys " + factory + " does not have consensus");
+            }
+        }
+
+        @Override
+        public MeasureIngester newIngester() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public MeasureAggregator newAggregator() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean needRewrite() {
+            return needRewrite;
+        }
+
+        @Override
+        public Class getRewriteCalciteAggrFunctionClass() {
+            throw new UnsupportedOperationException();
+        }
+        
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/8c2d6580/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 0c53703..eed5636 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -56,11 +56,13 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  */
@@ -133,11 +135,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         this.columnRowType = buildColumnRowType();
         this.afterAggregate = this.context.afterAggregate;
 
-        // only translate the first aggregation
+        // only translate the innermost aggregation
         if (!this.afterAggregate) {
             translateGroupBy();
-            fillbackOptimizedColumn();
-            translateAggregation();
+            this.context.aggregations.addAll(this.aggregations);
             this.context.afterAggregate = true;
         } else {
             for (AggregateCall aggCall : aggCalls) {
@@ -224,14 +225,72 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         context.groupByColumns.addAll(this.groups);
     }
 
+    @Override
+    public void implementRewrite(RewriteImplementor implementor) {
+        // only rewrite the innermost aggregation
+        if (!this.afterAggregate) {
+            translateAggregation();
+            buildRewriteFieldsAndMetricsColumns();
+        }
+        
+        implementor.visitChild(this, getInput());
+
+        // only rewrite the innermost aggregation
+        if (!this.afterAggregate && RewriteImplementor.needRewrite(this.context)) {
+            // rewrite the aggCalls
+            this.rewriteAggCalls = new ArrayList<AggregateCall>(aggCalls.size());
+            for (int i = 0; i < this.aggCalls.size(); i++) {
+                AggregateCall aggCall = this.aggCalls.get(i);
+                FunctionDesc cubeFunc = this.context.aggregations.get(i);
+                if (cubeFunc.needRewrite()) {
+                    aggCall = rewriteAggregateCall(aggCall, cubeFunc);
+                }
+                this.rewriteAggCalls.add(aggCall);
+            }
+        }
+
+        // rebuild rowType & columnRowType
+        this.rowType = this.deriveRowType();
+        this.columnRowType = this.buildColumnRowType();
+
+    }
+
     private void translateAggregation() {
+        // now the realization is known, replace aggregations with what's defined on MeasureDesc
+        List<MeasureDesc> measures = this.context.realization.getMeasures();
+        List<FunctionDesc> newAggrs = Lists.newArrayList();
+        for (FunctionDesc aggFunc : this.aggregations) {
+            newAggrs.add(findInMeasures(aggFunc, measures));
+        }
+        this.aggregations.clear();
+        this.aggregations.addAll(newAggrs);
+        this.context.aggregations.clear();
+        this.context.aggregations.addAll(newAggrs);
+    }
+
+    private FunctionDesc findInMeasures(FunctionDesc aggFunc, List<MeasureDesc> measures) {
+        for (MeasureDesc m : measures) {
+            if (aggFunc.equals(m.getFunction()))
+                return m.getFunction();
+        }
+        return aggFunc;
+    }
+
+    private void buildRewriteFieldsAndMetricsColumns() {
+        fillbackOptimizedColumn();
+        
         ColumnRowType inputColumnRowType = ((OLAPRel) getInput()).getColumnRowType();
         for (int i = 0; i < this.aggregations.size(); i++) {
             FunctionDesc aggFunc = this.aggregations.get(i);
-            context.aggregations.add(aggFunc);
+            
+            if (aggFunc.isDimensionAsMetric()) {
+                this.context.groupByColumns.addAll(aggFunc.getParameter().getColRefs());
+                continue; // skip rewrite, let calcite handle
+            }
+            
             if (aggFunc.needRewrite()) {
                 String rewriteFieldName = aggFunc.getRewriteFieldName();
-                context.rewriteFields.put(rewriteFieldName, null);
+                this.context.rewriteFields.put(rewriteFieldName, null);
 
                 TblColRef column = buildRewriteColumn(aggFunc);
                 this.context.metricsColumns.add(column);
@@ -263,31 +322,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         }
     }
 
-    @Override
-    public void implementRewrite(RewriteImplementor implementor) {
-        implementor.visitChild(this, getInput());
-
-        // only rewrite the first aggregation
-        if (!this.afterAggregate && RewriteImplementor.needRewrite(this.context)) {
-            // rewrite the aggCalls
-            this.rewriteAggCalls = new ArrayList<AggregateCall>(aggCalls.size());
-            for (int i = 0; i < this.aggCalls.size(); i++) {
-                AggregateCall aggCall = this.aggCalls.get(i);
-                FunctionDesc cubeFunc = this.context.aggregations.get(i);
-                if (cubeFunc.needRewrite()) {
-                    aggCall = rewriteAggregateCall(aggCall, cubeFunc);
-                }
-                this.rewriteAggCalls.add(aggCall);
-            }
-        }
-
-        // rebuild rowType & columnRowType
-        //ClassUtil.updateFinalField(Aggregate.class, "aggCalls", this, rewriteAggCalls);
-        this.rowType = this.deriveRowType(); // this does not work coz super.aggCalls is final
-        this.columnRowType = this.buildColumnRowType();
-
-    }
-
     private AggregateCall rewriteAggregateCall(AggregateCall aggCall, FunctionDesc func) {
 
         // rebuild parameters

http://git-wip-us.apache.org/repos/asf/kylin/blob/8c2d6580/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
index 7493e08..c4d0fbe 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
@@ -23,7 +23,6 @@ import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.CapabilityResult;
 import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
@@ -84,13 +83,6 @@ public class QueryRouter {
             if (inf instanceof DimensionAsMeasure) {
                 FunctionDesc functionDesc = ((DimensionAsMeasure) inf).getMeasureFunction();
                 functionDesc.setDimensionAsMetric(true);
-                olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
-                for (TblColRef col : functionDesc.getParameter().getColRefs()) {
-                    if (col != null) {
-                        olapContext.metricsColumns.remove(col);
-                        olapContext.groupByColumns.add(col);
-                    }
-                }
                 logger.info("Adjust DimensionAsMeasure for " + functionDesc);
             }
         }