You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/10/20 07:09:11 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4791 Fix exception 'UnsupportedOperationException: empty.reduceLeft' when there are cast expressions in the filters of FilePruner

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new d4cd28d  KYLIN-4791 Fix exception 'UnsupportedOperationException: empty.reduceLeft' when there are cast expressions in the filters of FilePruner
d4cd28d is described below

commit d4cd28d108c8f16aa1546434a9f311002d2d8010
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Mon Oct 19 16:13:13 2020 +0800

    KYLIN-4791 Fix exception 'UnsupportedOperationException: empty.reduceLeft' when there are cast expressions in the filters of FilePruner
    
    When execute function 'pruneSegments' of FilePruner, if there are some cast expressions in filter, it will throw exception 'UnsupportedOperationException: empty.reduceLeft'.
    
    Solution:
    Convert cast expressions in filter to attribute before translating filter.
---
 .../resources/query/sql_castprunesegs/query01.sql  | 22 +++++
 .../resources/query/sql_castprunesegs/query02.sql  | 22 +++++
 .../resources/query/sql_castprunesegs/query03.sql  | 31 +++++++
 .../sql/execution/datasource/FilePruner.scala      | 98 +++++++++++++++-------
 .../kylin/engine/spark2/NBuildAndQueryTest.java    |  1 +
 5 files changed, 145 insertions(+), 29 deletions(-)

diff --git a/kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql b/kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql
new file mode 100644
index 0000000..c0e64e3
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+SELECT sum(price)  as sum_price
+ FROM TEST_KYLIN_FACT 
+ WHERE CAL_DT > cast(TIMESTAMPADD(Day, -15000, CURRENT_DATE) as DATE)
+GROUP BY CAL_DT
+;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql b/kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql
new file mode 100644
index 0000000..5ea4572
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+SELECT sum(price)  as sum_price
+ FROM TEST_KYLIN_FACT 
+ WHERE CAL_DT > '2013-06-01'
+GROUP BY CAL_DT
+;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql b/kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql
new file mode 100644
index 0000000..99c92b2
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql
@@ -0,0 +1,31 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select test_cal_dt.cal_dt, sum(test_kylin_fact.price) as GMV 
+ , count(*) as TRANS_CNT 
+ from test_kylin_fact 
+inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+ inner JOIN edw.test_sites as test_sites
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id
+ where
+ extract(DAY from test_cal_dt.cal_dt) = 12
+ group by test_cal_dt.cal_dt
+;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 2ecd4fd..70b7956 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -260,47 +260,54 @@ class FilePruner(
     val filteredStatuses = if (filters.isEmpty) {
       segDirs
     } else {
-      val reducedFilter = filters.flatMap(DataSourceStrategy.translateFilter).reduceLeft(And)
-      segDirs.filter {
-        e => {
-          val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange
-          SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match {
-            case Trivial(true) => true
-            case Trivial(false) => false
+      val translatedFilter = filters.map(filter => convertCastFilter(filter))
+        .flatMap(DataSourceStrategy.translateFilter)
+      if (translatedFilter.isEmpty) {
+        logInfo("Can not use filters to prune segments.")
+        segDirs
+      } else {
+        val reducedFilter = translatedFilter.reduceLeft(And)
+        val pruned = segDirs.filter {
+          e => {
+            val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange
+            SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match {
+              case Trivial(true) => true
+              case Trivial(false) => false
+            }
           }
         }
+        logInfo(s"Selected files after segments pruning:" + pruned.map(_.segmentName))
+        pruned
       }
     }
-    logInfo(s"Selected files after segments pruning:" + filteredStatuses.map(_.segmentName))
     filteredStatuses
   }
 
-    private def pruneShards(
-      filters: Seq[Expression],
-      segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
-      val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) {
-        segDirs
-      } else {
-        val normalizedFiltersAndExpr = filters.reduce(expressions.And)
+  private def pruneShards(filters: Seq[Expression],
+                           segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
+    val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) {
+      segDirs
+    } else {
+      val normalizedFiltersAndExpr = filters.reduce(expressions.And)
 
-        val pruned = segDirs.map { case SegmentDirectory(segName, segIdentifier, files) =>
-          val segment = cubeInstance.getSegment(segName, SegmentStatusEnum.READY);
-          val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt
-          require(partitionNumber > 0, "Shards num with shard by col should greater than 0.")
+      val pruned = segDirs.map { case SegmentDirectory(segName, segIdentifier, files) =>
+        val segment = cubeInstance.getSegment(segName, SegmentStatusEnum.READY);
+        val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt
+        require(partitionNumber > 0, "Shards num with shard by col should greater than 0.")
 
-          val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber)
+        val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber)
 
-          val selected = files.filter(f => {
-            val partitionId = FilePruner.getPartitionId(f.getPath)
-            bitSet.get(partitionId)
-          })
-          SegmentDirectory(segName, segIdentifier, selected)
-        }
-        logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";"))
-        pruned
+        val selected = files.filter(f => {
+          val partitionId = FilePruner.getPartitionId(f.getPath)
+          bitSet.get(partitionId)
+        })
+        SegmentDirectory(segName, segIdentifier, selected)
       }
-      filteredStatuses
+      logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";"))
+      pruned
     }
+    filteredStatuses
+  }
 
   override lazy val inputFiles: Array[String] = Array.empty[String]
 
@@ -358,6 +365,39 @@ class FilePruner(
         matchedShards
     }
   }
+
+  //  translate for filter type match
+  private def convertCastFilter(filter: Expression): Expression = {
+    filter match {
+      case expressions.EqualTo(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+        expressions.EqualTo(a, Literal(v, t))
+      case expressions.EqualTo(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+        expressions.EqualTo(Literal(v, t), a)
+      case expressions.GreaterThan(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+        expressions.GreaterThan(a, Literal(v, t))
+      case expressions.GreaterThan(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+        expressions.GreaterThan(Literal(v, t), a)
+      case expressions.LessThan(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+        expressions.LessThan(a, Literal(v, t))
+      case expressions.LessThan(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+        expressions.LessThan(Literal(v, t), a)
+      case expressions.GreaterThanOrEqual(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+        expressions.GreaterThanOrEqual(a, Literal(v, t))
+      case expressions.GreaterThanOrEqual(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+        expressions.GreaterThanOrEqual(Literal(v, t), a)
+      case expressions.LessThanOrEqual(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+        expressions.LessThanOrEqual(a, Literal(v, t))
+      case expressions.LessThanOrEqual(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+        expressions.LessThanOrEqual(Literal(v, t), a)
+      case expressions.Or(left, right) =>
+        expressions.Or(convertCastFilter(left), convertCastFilter(right))
+      case expressions.And(left, right) =>
+        expressions.And(convertCastFilter(left), convertCastFilter(right))
+      case expressions.Not(child) =>
+        expressions.Not(convertCastFilter(child))
+      case _ => filter
+    }
+  }
 }
 
 object FilePruner {
diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index fd70f9b..df72e8a 100644
--- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -153,6 +153,7 @@ public class NBuildAndQueryTest extends LocalWithSparkSessionTest {
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql"));
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_lookup"));
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_casewhen"));
+            tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_castprunesegs"));
 
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_like"));
             tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_cache"));