You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/19 16:03:04 UTC
[carbondata] branch master updated: [CARBONDATA-3294] Fix that MV
datamap throw error when using count(1) and case when expression
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 85ec206 [CARBONDATA-3294] Fix that MV datamap throw error when using count(1) and case when expression
85ec206 is described below
commit 85ec206e670f769f6d7875c527941346924eff43
Author: qiuchenjian <80...@qq.com>
AuthorDate: Sat Feb 16 21:45:05 2019 +0800
[CARBONDATA-3294] Fix that MV datamap throw error when using count(1) and case when expression
[Problem]
MV datamap throw error when using count(1) and case when expression, the error is:
mismatched input 'FROM' expecting {, 'WHERE', 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 2, pos 0)
== SQL ==
SELECT MT.3600, MT.2250410101, countNum, rate
FROM
^^^
[Solution]
The compacted SQL has a extra 'case when' expression cause this error ,because window operator has a bug when transforming logic plan to modular plan
This closes #3128
---
.../mv/rewrite/MVCountAndCaseTestCase.scala | 97 ++++++++++++++++++++++
.../mv/plans/modular/ModularPatterns.scala | 11 ++-
2 files changed, 104 insertions(+), 4 deletions(-)
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
new file mode 100644
index 0000000..567d6a9
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class MVCountAndCaseTestCase extends QueryTest with BeforeAndAfterAll{
+
+
+ override def beforeAll(): Unit = {
+ drop
+ sql("create table region(l4id string,l4name string) using carbondata")
+ sql(
+ s"""create table data_table(
+ |starttime int, seq long,succ long,LAYER4ID string,tmp int)
+ |using carbondata""".stripMargin)
+ }
+
+ def drop(): Unit ={
+ sql("drop table if exists region")
+ sql("drop table if exists data_table")
+ }
+
+ test("test mv count and case when expression") {
+ sql("drop datamap if exists data_table_mv")
+ sql(s"""create datamap data_table_mv using 'mv' as
+ | SELECT STARTTIME,LAYER4ID,
+ | SUM(seq) AS seq_c,
+ | SUM(succ) AS succ_c
+ | FROM data_table
+ | GROUP BY STARTTIME,LAYER4ID""".stripMargin)
+
+ sql("rebuild datamap data_table_mv")
+
+ var frame = sql(s"""SELECT MT.`3600` AS `3600`,
+ | MT.`2250410101` AS `2250410101`,
+ | count(1) over() as countNum,
+ | (CASE WHEN (SUM(COALESCE(seq_c, 0))) = 0 THEN NULL
+ | ELSE
+ | (CASE WHEN (CAST((SUM(COALESCE(seq_c, 0))) AS int)) = 0 THEN 0
+ | ELSE ((CAST((SUM(COALESCE(succ_c, 0))) AS double))
+ | / (CAST((SUM(COALESCE(seq_c, 0))) AS double)))
+ | END) * 100
+ | END) AS rate
+ | FROM (
+ | SELECT sum_result.*, H_REGION.`2250410101` FROM
+ | (SELECT cast(floor((starttime + 28800) / 3600) * 3600 - 28800 as int) AS `3600`,
+ | LAYER4ID,
+ | COALESCE(SUM(seq), 0) AS seq_c,
+ | COALESCE(SUM(succ), 0) AS succ_c
+ | FROM data_table
+ | WHERE STARTTIME >= 1549866600 AND STARTTIME < 1549899900
+ | GROUP BY cast(floor((STARTTIME + 28800) / 3600) * 3600 - 28800 as int),LAYER4ID
+ | )sum_result
+ | LEFT JOIN
+ | (SELECT l4id AS `225040101`,
+ | l4name AS `2250410101`,
+ | l4name AS NAME_2250410101
+ | FROM region
+ | GROUP BY l4id, l4name) H_REGION
+ | ON sum_result.LAYER4ID = H_REGION.`225040101`
+ | WHERE H_REGION.NAME_2250410101 IS NOT NULL
+ | ) MT
+ | GROUP BY MT.`3600`, MT.`2250410101`
+ | ORDER BY `3600` ASC LIMIT 5000""".stripMargin)
+
+ assert(verifyMVDataMap(frame.queryExecution.analyzed, "data_table_mv"))
+ }
+
+ def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+ val tables = logicalPlan collect {
+ case l: LogicalRelation => l.catalogTable.get
+ }
+ tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+ }
+
+ override def afterAll(): Unit = {
+ drop
+ }
+}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
index c546c6e..a4116d9 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -18,11 +18,11 @@
package org.apache.carbondata.mv.plans.modular
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, PredicateHelper, _}
-import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Limit, LogicalPlan, Window}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.carbondata.mv.plans.{Pattern, _}
import org.apache.carbondata.mv.plans.modular.Flags._
-import org.apache.carbondata.mv.plans.util.{ExtractGroupByModule, ExtractSelectModule, ExtractSelectModuleForWindow, ExtractTableModule, ExtractUnionModule}
+import org.apache.carbondata.mv.plans.util._
object SimpleModularizer extends ModularPatterns {
def patterns: Seq[Pattern] = {
@@ -134,8 +134,11 @@ abstract class ModularPatterns extends Modularizer[ModularPlan] {
case Window(exprs, _, _,
ExtractSelectModuleForWindow(output, input, predicate, aliasmap, joinedge, children,
flags1, fspec1, wspec)) =>
- val sel1 = makeSelectModule(output, input, predicate, aliasmap, joinedge, flags1,
- children.map(modularizeLater), fspec1, wspec)
+ val sel1 = plan.asInstanceOf[Window].child match {
+ case agg: Aggregate => children.map (modularizeLater)
+ case other => makeSelectModule (output, input, predicate, aliasmap, joinedge, flags1,
+ children.map (modularizeLater), fspec1, wspec)
+ }
makeSelectModule(
output.map(_.toAttribute),
output.map(_.toAttribute),