You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/21 12:24:43 UTC

[carbondata] branch master updated: [CARBONDATA-3303] Fix that MV datamap return wrong results when using coalesce and less groupby columns

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new a2b7d20  [CARBONDATA-3303] Fix that MV datamap return wrong results when using coalesce and less groupby columns
a2b7d20 is described below

commit a2b7d20339a8ee28e1695e8eac9e1afa2c3a5b03
Author: qiuchenjian <80...@qq.com>
AuthorDate: Tue Feb 26 14:50:26 2019 +0800

    [CARBONDATA-3303] Fix that MV datamap return wrong results when using coalesce and less groupby columns
    
    Problem
    MV datamap return wrong results when using coalesce and query SQL's groupby columns is less than MV SQL's
    create table coalesce_test_main(id int,name string,height int,weight int using carbondata
    insert into coalesce_test_main select 1,'tom',170,130
    insert into coalesce_test_main select 2,'tom',170,120
    insert into coalesce_test_main select 3,'lily',160,100
    create datamap coalesce_test_main_mv using 'mv' as select coalesce(sum(id),0) as sum_id,name as myname,weight from coalesce_test_main group by name,weight
    select coalesce(sum(id),0) as sumid,name from coalesce_test_main group by name
    The query results:
    1 tom
    2 tom
    3 lily
    
    Solution
    When query SQL's groupby columns is less than MV SQL's and the MV SQL has coalesce expression, MV table cann't calculate the right result, so MV shouldn't take effect at this scene
    
    This closes #3135
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 14 +++-
 .../carbondata/mv/rewrite/MVCoalesceTestCase.scala | 91 ++++++++++++++++++++++
 .../carbondata/mv/rewrite/MVRewriteTestCase.scala  |  4 +-
 3 files changed, 106 insertions(+), 3 deletions(-)

diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 6d0b2d3..810449c 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, NamedExpression, ScalaUDF, SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor}
@@ -184,6 +184,18 @@ object MVHelper {
     if (catalog.isMVWithSameQueryPresent(logicalPlan)) {
       throw new UnsupportedOperationException("MV with same query present")
     }
+
+    var expressionValid = true
+    modularPlan.transformExpressions {
+      case coal@Coalesce(_) if coal.children.exists(
+        exp => exp.isInstanceOf[AggregateExpression]) =>
+        expressionValid = false
+        coal
+    }
+
+    if (!expressionValid) {
+      throw new UnsupportedOperationException("MV doesn't support Coalesce")
+    }
   }
 
   def updateColumnName(attr: Attribute): String = {
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCoalesceTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCoalesceTestCase.scala
new file mode 100644
index 0000000..f2a27c7
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCoalesceTestCase.scala
@@ -0,0 +1,91 @@
+/*
+	 * Licensed to the Apache Software Foundation (ASF) under one or more
+	 * contributor license agreements.  See the NOTICE file distributed with
+	 * this work for additional information regarding copyright ownership.
+	 * The ASF licenses this file to You under the Apache License, Version 2.0
+	 * (the "License"); you may not use this file except in compliance with
+	 * the License.  You may obtain a copy of the License at
+	 *
+	 *    http://www.apache.org/licenses/LICENSE-2.0
+	 *
+	 * Unless required by applicable law or agreed to in writing, software
+	 * distributed under the License is distributed on an "AS IS" BASIS,
+	 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+	 * See the License for the specific language governing permissions and
+	 * limitations under the License.
+	 */
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class MVCoalesceTestCase  extends QueryTest with BeforeAndAfterAll  {
+  override def beforeAll(): Unit = {
+    drop()
+    sql("create table coalesce_test_main(id int,name string,height int,weight int) " +
+      "using carbondata")
+    sql("insert into coalesce_test_main select 1,'tom',170,130")
+    sql("insert into coalesce_test_main select 2,'tom',170,120")
+    sql("insert into coalesce_test_main select 3,'lily',160,100")
+  }
+
+  def drop(): Unit = {
+    sql("drop table if exists coalesce_test_main")
+  }
+
+  test("test mv table with coalesce expression on sql not on mv and less groupby cols") {
+    sql("drop datamap if exists coalesce_test_main_mv")
+    sql("create datamap coalesce_test_main_mv using 'mv' as " +
+      "select sum(id) as sum_id,name as myname,weight from coalesce_test_main group by name,weight")
+    sql("rebuild datamap coalesce_test_main_mv")
+
+    val frame = sql("select coalesce(sum(id),0) as sumid,name from coalesce_test_main group by name")
+    assert(verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
+    checkAnswer(frame, Seq(Row(3, "tom"), Row(3, "lily")))
+
+    sql("drop datamap if exists coalesce_test_main_mv")
+  }
+
+  test("test mv table with coalesce expression less groupby cols") {
+    sql("drop datamap if exists coalesce_test_main_mv")
+    val exception: Exception = intercept[UnsupportedOperationException] {
+      sql("create datamap coalesce_test_main_mv using 'mv' as " +
+        "select coalesce(sum(id),0) as sum_id,name as myname,weight from coalesce_test_main group by name,weight")
+      sql("rebuild datamap coalesce_test_main_mv")
+    }
+    assert("MV doesn't support Coalesce".equals(exception.getMessage))
+
+    val frame = sql("select coalesce(sum(id),0) as sumid,name from coalesce_test_main group by name")
+    assert(!verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
+    checkAnswer(frame, Seq(Row(3, "tom"), Row(3, "lily")))
+
+    sql("drop datamap if exists coalesce_test_main_mv")
+  }
+
+  test("test mv table with coalesce expression in other expression") {
+    sql("drop datamap if exists coalesce_test_main_mv")
+    sql("create datamap coalesce_test_main_mv using 'mv' as " +
+      "select sum(coalesce(id,0)) as sum_id,name as myname,weight from coalesce_test_main group by name,weight")
+    sql("rebuild datamap coalesce_test_main_mv")
+
+    val frame = sql("select sum(coalesce(id,0)) as sumid,name from coalesce_test_main group by name")
+    assert(verifyMVDataMap(frame.queryExecution.analyzed, "coalesce_test_main_mv"))
+    checkAnswer(frame, Seq(Row(3, "tom"), Row(3, "lily")))
+
+    sql("drop datamap if exists coalesce_test_main_mv")
+  }
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+  }
+
+  override def afterAll(): Unit ={
+    drop
+  }
+}
\ No newline at end of file
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
index 3f5164f..3af1413 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
@@ -42,8 +42,8 @@ class MVRewriteTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap if exists data_table_mv")
     sql(s"""create datamap data_table_mv using 'mv' as
            	           | SELECT STARTTIME,LAYER4ID,
-           	           | COALESCE (SUM(seq),0) AS seq_c,
-           	           | COALESCE (SUM(succ),0)  AS succ_c
+           	           |  SUM(seq) AS seq_c,
+           	           |  SUM(succ)  AS succ_c
            	           | FROM data_table
            	           | GROUP BY STARTTIME,LAYER4ID""".stripMargin)