You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bo...@apache.org on 2019/08/11 12:50:57 UTC

[kylin] branch master updated: KYLIN-2820 Query can't read window function's result from subquery

This is an automated email from the ASF dual-hosted git repository.

boblu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new abc0ab8  KYLIN-2820 Query can't read window function's result from subquery
abc0ab8 is described below

commit abc0ab80a04f1bab95f93901f1b802aecaf72382
Author: nichunen <ni...@apache.org>
AuthorDate: Sun Aug 11 18:56:57 2019 +0800

    KYLIN-2820 Query can't read window function's result from subquery
---
 .../org/apache/kylin/metadata/model/TblColRef.java | 104 ++++++++++++---------
 .../test/resources/query/sql_window/query13.sql    |  22 +++++
 .../apache/kylin/query/relnode/ColumnRowType.java  |  14 ++-
 .../kylin/query/relnode/OLAPAggregateRel.java      |  50 +++++-----
 .../apache/kylin/query/relnode/OLAPProjectRel.java |  26 ++++--
 .../apache/kylin/query/relnode/OLAPWindowRel.java  |  11 +++
 .../relnode/visitor/TupleExpressionVisitor.java    |   7 +-
 7 files changed, 153 insertions(+), 81 deletions(-)

diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index 0dc08a9..960cc19 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -21,12 +21,13 @@ package org.apache.kylin.metadata.model;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.Serializable;
-
+import java.util.List;
 import java.util.Locale;
 import java.util.function.Function;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.expression.TupleExpression;
 
 /**
  */
@@ -34,27 +35,36 @@ import org.apache.kylin.metadata.datatype.DataType;
 public class TblColRef implements Serializable {
 
     private static final String INNER_TABLE_NAME = "_kylin_table";
+    private static final DataModelDesc UNKNOWN_MODEL = new DataModelDesc();
 
-    // used by projection rewrite, see OLAPProjectRel
-    public enum InnerDataTypeEnum {
-
-        LITERAL("_literal_type"), DERIVED("_derived_type");
-
-        private final String dateType;
+    static {
+        UNKNOWN_MODEL.setName("UNKNOWN_MODEL");
+    }
 
-        private InnerDataTypeEnum(String name) {
-            this.dateType = name;
-        }
+    private TableRef table;
+    private TableRef backupTable;// only used in fixTableRef()
+    private ColumnDesc column;
+    private String identity;
+    private String parserDescription;
+    //used in window function
+    private List<TupleExpression> subTupleExps;
+    /**
+     * Function used to get quoted identitier
+     */
+    private transient Function<TblColRef, String> quotedFunc;
 
-        public String getDataType() {
-            return dateType;
-        }
+    TblColRef(ColumnDesc column) {
+        this.column = column;
+    }
 
-        public static boolean contains(String name) {
-            return LITERAL.getDataType().equals(name) || DERIVED.getDataType().equals(name);
-        }
+    TblColRef(TableRef table, ColumnDesc column) {
+        checkArgument(table.getTableDesc().getIdentity().equals(column.getTable().getIdentity()));
+        this.table = table;
+        this.column = column;
     }
 
+    // ============================================================================
+
     // used by projection rewrite, see OLAPProjectRel
     public static TblColRef newInnerColumn(String columnName, InnerDataTypeEnum dataType) {
         return newInnerColumn(columnName, dataType, null);
@@ -72,11 +82,6 @@ public class TblColRef implements Serializable {
         return colRef;
     }
 
-    private static final DataModelDesc UNKNOWN_MODEL = new DataModelDesc();
-    static {
-        UNKNOWN_MODEL.setName("UNKNOWN_MODEL");
-    }
-
     public static TableRef tableForUnknownModel(String tempTableAlias, TableDesc table) {
         return new TableRef(UNKNOWN_MODEL, tempTableAlias, table, false);
     }
@@ -103,7 +108,8 @@ public class TblColRef implements Serializable {
     }
 
     // for test mainly
-    public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype, String comment) {
+    public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype,
+            String comment) {
         ColumnDesc desc = new ColumnDesc();
         String id = "" + oneBasedColumnIndex;
         desc.setId(id);
@@ -114,33 +120,10 @@ public class TblColRef implements Serializable {
         return new TblColRef(desc);
     }
 
-    // ============================================================================
-
-    private TableRef table;
-    private TableRef backupTable;// only used in fixTableRef()
-    private ColumnDesc column;
-    private String identity;
-    private String parserDescription;
-
-    /**
-     * Function used to get quoted identitier
-     */
-    private transient Function<TblColRef, String> quotedFunc;
-
     public void setQuotedFunc(Function<TblColRef, String> quotedFunc) {
         this.quotedFunc = quotedFunc;
     }
 
-    TblColRef(ColumnDesc column) {
-        this.column = column;
-    }
-
-    TblColRef(TableRef table, ColumnDesc column) {
-        checkArgument(table.getTableDesc().getIdentity().equals(column.getTable().getIdentity()));
-        this.table = table;
-        this.column = column;
-    }
-
     public void fixTableRef(TableRef tableRef) {
         this.backupTable = this.table;
         this.table = tableRef;
@@ -199,9 +182,18 @@ public class TblColRef implements Serializable {
         return column.getType();
     }
 
-    public String getBackupTableAlias(){
+    public List<TupleExpression> getSubTupleExps() {
+        return subTupleExps;
+    }
+
+    public void setSubTupleExps(List<TupleExpression> subTubleExps) {
+        this.subTupleExps = subTubleExps;
+    }
+
+    public String getBackupTableAlias() {
         return backupTable.getAlias();
     }
+
     private void markInnerColumn(InnerDataTypeEnum dataType) {
         this.column.setDatatype(dataType.getDataType());
         this.column.getTable().setName(INNER_TABLE_NAME);
@@ -286,4 +278,24 @@ public class TblColRef implements Serializable {
     public String getColumWithTableAndSchema() {
         return (getTableWithSchema() + "." + column.getName()).toUpperCase(Locale.ROOT);
     }
+
+    // used by projection rewrite, see OLAPProjectRel
+    public enum InnerDataTypeEnum {
+
+        LITERAL("_literal_type"), DERIVED("_derived_type");
+
+        private final String dateType;
+
+        private InnerDataTypeEnum(String name) {
+            this.dateType = name;
+        }
+
+        public static boolean contains(String name) {
+            return LITERAL.getDataType().equals(name) || DERIVED.getDataType().equals(name);
+        }
+
+        public String getDataType() {
+            return dateType;
+        }
+    }
 }
diff --git a/kylin-it/src/test/resources/query/sql_window/query13.sql b/kylin-it/src/test/resources/query/sql_window/query13.sql
new file mode 100644
index 0000000..9807fd6
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_window/query13.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+select t.first_seller_id as first_seller_id_test, count(*) from (
+select first_value(seller_id) over (partition by buyer_id) as first_seller_id
+from test_kylin_fact inner join test_order on test_kylin_fact.order_id=test_order.order_id
+)
+as t group by t.first_seller_id
\ No newline at end of file
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
index 65a00e6..941d341 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.query.relnode;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.kylin.common.util.Pair;
@@ -81,7 +82,7 @@ public class ColumnRowType {
         return new Pair<>(oldCol, oldExpr);
     }
 
-    public TupleExpression getSourceColumnsByIndex(int i) {
+    public TupleExpression getTupleExpressionByIndex(int i) {
         TupleExpression result = null;
         if (sourceColumns != null) {
             result = sourceColumns.get(i);
@@ -92,6 +93,17 @@ public class ColumnRowType {
         return result;
     }
 
+    public List<TupleExpression> getSourceColumns() {
+        if (sourceColumns == null) {
+            List<TupleExpression> sources = new ArrayList<>();
+            for (int i = 0; i < columns.size(); i++) {
+                sources.add(getTupleExpressionByIndex(i));
+            }
+            sourceColumns = sources;
+        }
+        return sourceColumns;
+    }
+
     public List<TblColRef> getAllColumns() {
         return columns;
     }
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 9def697..344a34e 100755
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.query.relnode;
 
+import static org.apache.kylin.metadata.expression.TupleExpression.ExpressionOperatorEnum.COLUMN;
+
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -85,7 +87,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import static org.apache.kylin.metadata.expression.TupleExpression.ExpressionOperatorEnum.COLUMN;
 /**
  */
 public class OLAPAggregateRel extends Aggregate implements OLAPRel {
@@ -111,30 +112,14 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         for (String func : udafs.keySet()) {
             try {
                 AGGR_FUNC_PARAM_AS_MEASURE_MAP.put(func,
-                        ((ParamAsMeasureCount) (udafs.get(func).getDeclaredConstructor().newInstance())).getParamAsMeasureCount());
+                        ((ParamAsMeasureCount) (udafs.get(func).getDeclaredConstructor().newInstance()))
+                                .getParamAsMeasureCount());
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
         }
     }
 
-    static String getSqlFuncName(AggregateCall aggCall) {
-        String sqlName = aggCall.getAggregation().getName();
-        if (aggCall.isDistinct()) {
-            sqlName = sqlName + "_DISTINCT";
-        }
-        return sqlName;
-    }
-
-    public static String getAggrFuncName(AggregateCall aggCall) {
-        String sqlName = getSqlFuncName(aggCall);
-        String funcName = AGGR_FUNC_MAP.get(sqlName);
-        if (funcName == null) {
-            throw new IllegalStateException("Non-support aggregation " + sqlName);
-        }
-        return funcName;
-    }
-
     OLAPContext context;
     ColumnRowType columnRowType;
     private boolean afterAggregate;
@@ -143,7 +128,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
     private List<TblColRef> groups;
     private List<FunctionDesc> aggregations;
     private boolean rewriting;
-
     public OLAPAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator,
             ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls)
             throws InvalidRelException {
@@ -154,6 +138,23 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         this.rowType = getRowType();
     }
 
+    static String getSqlFuncName(AggregateCall aggCall) {
+        String sqlName = aggCall.getAggregation().getName();
+        if (aggCall.isDistinct()) {
+            sqlName = sqlName + "_DISTINCT";
+        }
+        return sqlName;
+    }
+
+    public static String getAggrFuncName(AggregateCall aggCall) {
+        String sqlName = getSqlFuncName(aggCall);
+        String funcName = AGGR_FUNC_MAP.get(sqlName);
+        if (funcName == null) {
+            throw new IllegalStateException("Non-support aggregation " + sqlName);
+        }
+        return funcName;
+    }
+
     @Override
     public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator, ImmutableBitSet groupSet,
             List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
@@ -268,10 +269,11 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         ColumnRowType inputColumnRowType = ((OLAPRel) getInput()).getColumnRowType();
         this.groups = Lists.newArrayList();
         for (int i = getGroupSet().nextSetBit(0); i >= 0; i = getGroupSet().nextSetBit(i + 1)) {
-            TupleExpression tupleExpression = inputColumnRowType.getSourceColumnsByIndex(i);
+            TupleExpression tupleExpression = inputColumnRowType.getTupleExpressionByIndex(i);
 
             // group by column with operator
-            if (this.context.groupByExpression == false && !(COLUMN.equals(tupleExpression.getOperator()) && tupleExpression.getChildren().isEmpty())) {
+            if (this.context.groupByExpression == false
+                    && !(COLUMN.equals(tupleExpression.getOperator()) && tupleExpression.getChildren().isEmpty())) {
                 this.context.groupByExpression = true;
             }
 
@@ -350,7 +352,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
             // Check dynamic aggregation
             if (this.context.isDynamicColumnEnabled() && !afterAggregate && !rewriting && argList.size() == 1) {
                 int iRowIdx = argList.get(0);
-                TupleExpression tupleExpr = inputColumnRowType.getSourceColumnsByIndex(iRowIdx);
+                TupleExpression tupleExpr = inputColumnRowType.getTupleExpressionByIndex(iRowIdx);
                 if (aggCall.getAggregation() instanceof SqlSumAggFunction
                         || aggCall.getAggregation() instanceof SqlSumEmptyIsZeroAggFunction) {
                     // sum (expression)
@@ -571,7 +573,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         String callName = getSqlFuncName(aggCall);
         RelDataType fieldType = aggCall.getType();
         SqlAggFunction newAgg = aggCall.getAggregation();
-        
+
         Map<String, Class<?>> udafMap = func.getMeasureType().getRewriteCalciteAggrFunctions();
         if (func.isCount()) {
             newAgg = SqlStdOperatorTable.SUM0;
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java
index 3163376..59d2c38 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java
@@ -63,6 +63,7 @@ import org.apache.kylin.metadata.expression.ColumnTupleExpression;
 import org.apache.kylin.metadata.expression.ExpressionColCollector;
 import org.apache.kylin.metadata.expression.NoneTupleExpression;
 import org.apache.kylin.metadata.expression.NumberTupleExpression;
+import org.apache.kylin.metadata.expression.RexCallTupleExpression;
 import org.apache.kylin.metadata.expression.StringTupleExpression;
 import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -80,8 +81,14 @@ import com.google.common.collect.Maps;
  */
 public class OLAPProjectRel extends Project implements OLAPRel {
 
-    OLAPContext context;
+    private final BasicSqlType dateType = new BasicSqlType(getCluster().getTypeFactory().getTypeSystem(),
+            SqlTypeName.DATE);
+    private final BasicSqlType timestampType = new BasicSqlType(getCluster().getTypeFactory().getTypeSystem(),
+            SqlTypeName.TIMESTAMP);
+    private final ArraySqlType dateArrayType = new ArraySqlType(dateType, true);
+    private final ArraySqlType timestampArrayType = new ArraySqlType(timestampType, true);
     public List<RexNode> rewriteProjects;
+    OLAPContext context;
     boolean rewriting;
     ColumnRowType columnRowType;
     boolean hasJoin;
@@ -89,12 +96,6 @@ public class OLAPProjectRel extends Project implements OLAPRel {
     boolean afterAggregate;
     boolean isMerelyPermutation = false;//project additionally added by OLAPJoinPushThroughJoinRule
     private int caseCount = 0;
-
-    private final BasicSqlType dateType = new BasicSqlType(getCluster().getTypeFactory().getTypeSystem(), SqlTypeName.DATE);
-    private final BasicSqlType timestampType = new BasicSqlType(getCluster().getTypeFactory().getTypeSystem(), SqlTypeName.TIMESTAMP);
-    private final ArraySqlType dateArrayType = new ArraySqlType(dateType, true);
-    private final ArraySqlType timestampArrayType = new ArraySqlType(timestampType, true);
-
     /**
      * A flag indicate whether has intersect_count in query
      */
@@ -187,7 +188,7 @@ public class OLAPProjectRel extends Project implements OLAPRel {
             String fieldName = columnField.getName();
 
             TupleExpression tupleExpr = rex.accept(visitor);
-            TblColRef column = translateRexNode(tupleExpr, fieldName);
+            TblColRef column = translateRexNode(rex, inputColumnRowType, tupleExpr, fieldName);
             if (!this.rewriting && !this.afterAggregate && !isMerelyPermutation) {
                 Set<TblColRef> srcCols = ExpressionColCollector.collectColumns(tupleExpr);
                 // remove cols not belonging to context tables
@@ -219,7 +220,8 @@ public class OLAPProjectRel extends Project implements OLAPRel {
         return new ColumnRowType(columns, sourceColumns);
     }
 
-    private TblColRef translateRexNode(TupleExpression tupleExpr, String fieldName) {
+    private TblColRef translateRexNode(RexNode rexNode, ColumnRowType inputColumnRowType, TupleExpression tupleExpr,
+            String fieldName) {
         if (tupleExpr instanceof ColumnTupleExpression) {
             return ((ColumnTupleExpression) tupleExpr).getColumn();
         } else if (tupleExpr instanceof NumberTupleExpression) {
@@ -228,6 +230,12 @@ public class OLAPProjectRel extends Project implements OLAPRel {
         } else if (tupleExpr instanceof StringTupleExpression) {
             Object value = ((StringTupleExpression) tupleExpr).getValue();
             return TblColRef.newInnerColumn(value == null ? "null" : value.toString(), InnerDataTypeEnum.LITERAL);
+        } else if (tupleExpr instanceof RexCallTupleExpression && rexNode instanceof RexInputRef) {
+            RexInputRef inputRef = (RexInputRef) rexNode;
+            int index = inputRef.getIndex();
+            if (index < inputColumnRowType.size()) {
+                return inputColumnRowType.getColumnByIndex(index);
+            }
         }
         return TblColRef.newInnerColumn(fieldName, InnerDataTypeEnum.LITERAL, tupleExpr.getDigest());
     }
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java
index a4ca1b7..7c9721a 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java
@@ -29,6 +29,7 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -36,6 +37,8 @@ import org.apache.calcite.rel.core.Window;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexLiteral;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
@@ -93,9 +96,17 @@ public class OLAPWindowRel extends Window implements OLAPRel {
 
         // add window aggregate calls column
         for (Group group : groups) {
+            List<TupleExpression> sourceColOuter = Lists.newArrayList();
+            group.keys.asSet().stream().map(inputColumnRowType::getTupleExpressionByIndex).forEach(sourceColOuter::add);
+            group.orderKeys.getFieldCollations().stream().map(RelFieldCollation::getFieldIndex)
+                    .map(inputColumnRowType::getTupleExpressionByIndex).forEach(sourceColOuter::add);
             for (AggregateCall aggrCall : group.getAggregateCalls(this)) {
                 TblColRef aggrCallCol = TblColRef.newInnerColumn(aggrCall.getName(),
                         TblColRef.InnerDataTypeEnum.LITERAL);
+                List<TupleExpression> sourceColInner = Lists.newArrayList(sourceColOuter.iterator());
+                aggrCall.getArgList().stream().filter(i -> i < inputColumnRowType.size())
+                        .map(inputColumnRowType::getTupleExpressionByIndex).forEach(sourceColInner::add);
+                aggrCallCol.setSubTupleExps(sourceColInner);
                 columns.add(aggrCallCol);
             }
         }
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleExpressionVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleExpressionVisitor.java
index 03b58dc..0279397 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleExpressionVisitor.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleExpressionVisitor.java
@@ -158,7 +158,12 @@ public class TupleExpressionVisitor extends RexVisitorImpl<TupleExpression> {
         // check it for rewrite count
         if (index < inputRowType.size()) {
             TblColRef column = inputRowType.getColumnByIndex(index);
-            TupleExpression tuple = new ColumnTupleExpression(column);
+            TupleExpression tuple;
+            if (column.getSubTupleExps() != null) {
+                tuple = new RexCallTupleExpression(column.getSubTupleExps());
+            } else {
+                tuple = new ColumnTupleExpression(column);
+            }
             tuple.setDigest(inputRef.toString());
             return tuple;
         } else {