You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/19 16:03:04 UTC

[carbondata] branch master updated: [CARBONDATA-3294] Fix that MV datamap throw error when using count(1) and case when expression

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 85ec206  [CARBONDATA-3294] Fix that MV datamap throw error when using count(1) and case when expression
85ec206 is described below

commit 85ec206e670f769f6d7875c527941346924eff43
Author: qiuchenjian <80...@qq.com>
AuthorDate: Sat Feb 16 21:45:05 2019 +0800

    [CARBONDATA-3294] Fix that MV datamap throw error when using count(1) and case when expression
    
    [Problem]
    MV datamap throw error when using count(1) and case when expression, the error is:
    mismatched input 'FROM' expecting {, 'WHERE', 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 2, pos 0)
    == SQL ==
    SELECT MT.3600, MT.2250410101, countNum, rate
    FROM
    ^^^
    
    [Solution]
    The compacted SQL has a extra 'case when' expression cause this error ,because window operator has a bug when transforming logic plan to modular plan
    
    This closes #3128
---
 .../mv/rewrite/MVCountAndCaseTestCase.scala        | 97 ++++++++++++++++++++++
 .../mv/plans/modular/ModularPatterns.scala         | 11 ++-
 2 files changed, 104 insertions(+), 4 deletions(-)

diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
new file mode 100644
index 0000000..567d6a9
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class MVCountAndCaseTestCase  extends QueryTest with BeforeAndAfterAll{
+
+
+  override def beforeAll(): Unit = {
+    drop
+    sql("create table region(l4id string,l4name string) using carbondata")
+    sql(
+      s"""create table data_table(
+         |starttime int, seq long,succ long,LAYER4ID string,tmp int)
+         |using carbondata""".stripMargin)
+  }
+
+  def drop(): Unit ={
+    sql("drop table if exists region")
+    sql("drop table if exists data_table")
+  }
+
+  test("test mv count and case when expression") {
+    sql("drop datamap if exists data_table_mv")
+    sql(s"""create datamap data_table_mv using 'mv' as
+           | SELECT STARTTIME,LAYER4ID,
+           | SUM(seq) AS seq_c,
+           | SUM(succ) AS succ_c
+           | FROM data_table
+           | GROUP BY STARTTIME,LAYER4ID""".stripMargin)
+
+    sql("rebuild datamap data_table_mv")
+
+    var frame = sql(s"""SELECT  MT.`3600` AS `3600`,
+                       | MT.`2250410101` AS `2250410101`,
+                       | count(1) over() as countNum,
+                       | (CASE WHEN (SUM(COALESCE(seq_c, 0))) = 0 THEN NULL
+                       |   ELSE
+                       |   (CASE WHEN (CAST((SUM(COALESCE(seq_c, 0))) AS int)) = 0 THEN 0
+                       |     ELSE ((CAST((SUM(COALESCE(succ_c, 0))) AS double))
+                       |     / (CAST((SUM(COALESCE(seq_c, 0))) AS double)))
+                       |     END) * 100
+                       |   END) AS rate
+                       | FROM (
+                       |   SELECT sum_result.*, H_REGION.`2250410101` FROM
+                       |   (SELECT cast(floor((starttime + 28800) / 3600) * 3600 - 28800 as int) AS `3600`,
+                       |     LAYER4ID,
+                       |     COALESCE(SUM(seq), 0) AS seq_c,
+                       |     COALESCE(SUM(succ), 0) AS succ_c
+                       |       FROM data_table
+                       |       WHERE STARTTIME >= 1549866600 AND STARTTIME < 1549899900
+                       |       GROUP BY cast(floor((STARTTIME + 28800) / 3600) * 3600 - 28800 as int),LAYER4ID
+                       |   )sum_result
+                       |   LEFT JOIN
+                       |   (SELECT l4id AS `225040101`,
+                       |     l4name AS `2250410101`,
+                       |     l4name AS NAME_2250410101
+                       |       FROM region
+                       |       GROUP BY l4id, l4name) H_REGION
+                       |   ON sum_result.LAYER4ID = H_REGION.`225040101`
+                       | WHERE H_REGION.NAME_2250410101 IS NOT NULL
+                       | ) MT
+                       | GROUP BY MT.`3600`, MT.`2250410101`
+                       | ORDER BY `3600` ASC LIMIT 5000""".stripMargin)
+
+    assert(verifyMVDataMap(frame.queryExecution.analyzed, "data_table_mv"))
+  }
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+  }
+
+  override def afterAll(): Unit = {
+    drop
+  }
+}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
index c546c6e..a4116d9 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -18,11 +18,11 @@
 package org.apache.carbondata.mv.plans.modular
 
 import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, PredicateHelper, _}
-import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Limit, LogicalPlan, Window}
+import org.apache.spark.sql.catalyst.plans.logical._
 
 import org.apache.carbondata.mv.plans.{Pattern, _}
 import org.apache.carbondata.mv.plans.modular.Flags._
-import org.apache.carbondata.mv.plans.util.{ExtractGroupByModule, ExtractSelectModule, ExtractSelectModuleForWindow, ExtractTableModule, ExtractUnionModule}
+import org.apache.carbondata.mv.plans.util._
 
 object SimpleModularizer extends ModularPatterns {
   def patterns: Seq[Pattern] = {
@@ -134,8 +134,11 @@ abstract class ModularPatterns extends Modularizer[ModularPlan] {
         case Window(exprs, _, _,
           ExtractSelectModuleForWindow(output, input, predicate, aliasmap, joinedge, children,
             flags1, fspec1, wspec)) =>
-          val sel1 = makeSelectModule(output, input, predicate, aliasmap, joinedge, flags1,
-            children.map(modularizeLater), fspec1, wspec)
+          val sel1 = plan.asInstanceOf[Window].child match {
+            case agg: Aggregate => children.map (modularizeLater)
+            case other => makeSelectModule (output, input, predicate, aliasmap, joinedge, flags1,
+              children.map (modularizeLater), fspec1, wspec)
+          }
           makeSelectModule(
             output.map(_.toAttribute),
             output.map(_.toAttribute),