You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/10/20 07:09:11 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4791 Fix
exception 'UnsupportedOperationException: empty.reduceLeft' when there are
cast expressions in the filters of FilePruner
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new d4cd28d KYLIN-4791 Fix exception 'UnsupportedOperationException: empty.reduceLeft' when there are cast expressions in the filters of FilePruner
d4cd28d is described below
commit d4cd28d108c8f16aa1546434a9f311002d2d8010
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Mon Oct 19 16:13:13 2020 +0800
KYLIN-4791 Fix exception 'UnsupportedOperationException: empty.reduceLeft' when there are cast expressions in the filters of FilePruner
When execute function 'pruneSegments' of FilePruner, if there are some cast expressions in filter, it will throw exception 'UnsupportedOperationException: empty.reduceLeft'.
Solution:
Convert cast expressions in filter to attribute before translating filter.
---
.../resources/query/sql_castprunesegs/query01.sql | 22 +++++
.../resources/query/sql_castprunesegs/query02.sql | 22 +++++
.../resources/query/sql_castprunesegs/query03.sql | 31 +++++++
.../sql/execution/datasource/FilePruner.scala | 98 +++++++++++++++-------
.../kylin/engine/spark2/NBuildAndQueryTest.java | 1 +
5 files changed, 145 insertions(+), 29 deletions(-)
diff --git a/kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql b/kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql
new file mode 100644
index 0000000..c0e64e3
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+SELECT sum(price) as sum_price
+ FROM TEST_KYLIN_FACT
+ WHERE CAL_DT > cast(TIMESTAMPADD(Day, -15000, CURRENT_DATE) as DATE)
+GROUP BY CAL_DT
+;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql b/kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql
new file mode 100644
index 0000000..5ea4572
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+SELECT sum(price) as sum_price
+ FROM TEST_KYLIN_FACT
+ WHERE CAL_DT > '2013-06-01'
+GROUP BY CAL_DT
+;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql b/kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql
new file mode 100644
index 0000000..99c92b2
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql
@@ -0,0 +1,31 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select test_cal_dt.cal_dt, sum(test_kylin_fact.price) as GMV
+ , count(*) as TRANS_CNT
+ from test_kylin_fact
+inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+ inner JOIN edw.test_sites as test_sites
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id
+ where
+ extract(DAY from test_cal_dt.cal_dt) = 12
+ group by test_cal_dt.cal_dt
+;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 2ecd4fd..70b7956 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -260,47 +260,54 @@ class FilePruner(
val filteredStatuses = if (filters.isEmpty) {
segDirs
} else {
- val reducedFilter = filters.flatMap(DataSourceStrategy.translateFilter).reduceLeft(And)
- segDirs.filter {
- e => {
- val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange
- SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match {
- case Trivial(true) => true
- case Trivial(false) => false
+ val translatedFilter = filters.map(filter => convertCastFilter(filter))
+ .flatMap(DataSourceStrategy.translateFilter)
+ if (translatedFilter.isEmpty) {
+ logInfo("Can not use filters to prune segments.")
+ segDirs
+ } else {
+ val reducedFilter = translatedFilter.reduceLeft(And)
+ val pruned = segDirs.filter {
+ e => {
+ val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange
+ SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match {
+ case Trivial(true) => true
+ case Trivial(false) => false
+ }
}
}
+ logInfo(s"Selected files after segments pruning:" + pruned.map(_.segmentName))
+ pruned
}
}
- logInfo(s"Selected files after segments pruning:" + filteredStatuses.map(_.segmentName))
filteredStatuses
}
- private def pruneShards(
- filters: Seq[Expression],
- segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
- val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) {
- segDirs
- } else {
- val normalizedFiltersAndExpr = filters.reduce(expressions.And)
+ private def pruneShards(filters: Seq[Expression],
+ segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
+ val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) {
+ segDirs
+ } else {
+ val normalizedFiltersAndExpr = filters.reduce(expressions.And)
- val pruned = segDirs.map { case SegmentDirectory(segName, segIdentifier, files) =>
- val segment = cubeInstance.getSegment(segName, SegmentStatusEnum.READY);
- val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt
- require(partitionNumber > 0, "Shards num with shard by col should greater than 0.")
+ val pruned = segDirs.map { case SegmentDirectory(segName, segIdentifier, files) =>
+ val segment = cubeInstance.getSegment(segName, SegmentStatusEnum.READY);
+ val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt
+ require(partitionNumber > 0, "Shards num with shard by col should greater than 0.")
- val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber)
+ val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber)
- val selected = files.filter(f => {
- val partitionId = FilePruner.getPartitionId(f.getPath)
- bitSet.get(partitionId)
- })
- SegmentDirectory(segName, segIdentifier, selected)
- }
- logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";"))
- pruned
+ val selected = files.filter(f => {
+ val partitionId = FilePruner.getPartitionId(f.getPath)
+ bitSet.get(partitionId)
+ })
+ SegmentDirectory(segName, segIdentifier, selected)
}
- filteredStatuses
+ logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";"))
+ pruned
}
+ filteredStatuses
+ }
override lazy val inputFiles: Array[String] = Array.empty[String]
@@ -358,6 +365,39 @@ class FilePruner(
matchedShards
}
}
+
+ // translate for filter type match
+ private def convertCastFilter(filter: Expression): Expression = {
+ filter match {
+ case expressions.EqualTo(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+ expressions.EqualTo(a, Literal(v, t))
+ case expressions.EqualTo(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+ expressions.EqualTo(Literal(v, t), a)
+ case expressions.GreaterThan(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+ expressions.GreaterThan(a, Literal(v, t))
+ case expressions.GreaterThan(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+ expressions.GreaterThan(Literal(v, t), a)
+ case expressions.LessThan(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+ expressions.LessThan(a, Literal(v, t))
+ case expressions.LessThan(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+ expressions.LessThan(Literal(v, t), a)
+ case expressions.GreaterThanOrEqual(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+ expressions.GreaterThanOrEqual(a, Literal(v, t))
+ case expressions.GreaterThanOrEqual(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+ expressions.GreaterThanOrEqual(Literal(v, t), a)
+ case expressions.LessThanOrEqual(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
+ expressions.LessThanOrEqual(a, Literal(v, t))
+ case expressions.LessThanOrEqual(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
+ expressions.LessThanOrEqual(Literal(v, t), a)
+ case expressions.Or(left, right) =>
+ expressions.Or(convertCastFilter(left), convertCastFilter(right))
+ case expressions.And(left, right) =>
+ expressions.And(convertCastFilter(left), convertCastFilter(right))
+ case expressions.Not(child) =>
+ expressions.Not(convertCastFilter(child))
+ case _ => filter
+ }
+ }
}
object FilePruner {
diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index fd70f9b..df72e8a 100644
--- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -153,6 +153,7 @@ public class NBuildAndQueryTest extends LocalWithSparkSessionTest {
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_lookup"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_casewhen"));
+ tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_castprunesegs"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_like"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_cache"));