You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/18 05:18:48 UTC

[carbondata] branch master updated: [CARBONDATA-3433]Fix MV issues related to duplicate columns, limit and constant columns

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 581591a  [CARBONDATA-3433]Fix MV issues related to duplicate columns, limit and constant columns
581591a is described below

commit 581591a383d3bf7216c6978315ec1377ac790d8a
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Thu Jun 13 13:42:19 2019 +0530

    [CARBONDATA-3433]Fix MV issues related to duplicate columns, limit and constant columns
    
    Problem:
    MV has below issues:
    when has duplicate columns in select query, MV creation fails, but select is valid query
    when used constant column in ctas for datamap creation, it fails
    when limit is used in ctas for datamap creation, it fails
    
    Solution:
    since duplicate columns in query is valid, MV should support, so when creating columns, better take distinct columns
    handle getting field relation map when we have constant column in query
    block MV creation for limit ctas query, as it is not a valid case to use MV datamap.
    
    This closes #3285
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 18 ++++++++++--
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  |  9 ++++--
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   | 33 ++++++++++++++--------
 .../mv/rewrite/MVExceptionTestCase.scala           |  9 +++++-
 4 files changed, 52 insertions(+), 17 deletions(-)

diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 57082d7..4d43088 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, NamedExpression, ScalaUDF, SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit, LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.{Field, PartitionerField, TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -65,6 +65,13 @@ object MVHelper {
     val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
     val query = sparkSession.sql(updatedQuery)
     val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
+    // if there is limit in MV ctas query string, throw exception, as its not a valid usecase
+    logicalPlan match {
+      case Limit(_, _) =>
+        throw new MalformedCarbonCommandException("MV datamap does not support the query with " +
+                                                  "limit")
+      case _ =>
+    }
     val selectTables = getTables(logicalPlan)
     if (selectTables.isEmpty) {
       throw new MalformedCarbonCommandException(
@@ -72,6 +79,8 @@ object MVHelper {
     }
     val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
     val fullRebuild = isFullReload(logicalPlan)
+    // the ctas query can have duplicate columns, so we should take distinct and create fields,
+    // so that it won't fail during create mv table
     val fields = logicalPlan.output.map { attr =>
       if (attr.dataType.isInstanceOf[ArrayType] || attr.dataType.isInstanceOf[StructType] ||
           attr.dataType.isInstanceOf[MapType]) {
@@ -96,7 +105,8 @@ object MVHelper {
           children = None,
           rawSchema = rawSchema)
       }
-    }
+    }.distinct
+
     val tableProperties = mutable.Map[String, String]()
     val parentTables = new util.ArrayList[String]()
     val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size)
@@ -403,7 +413,9 @@ object MVHelper {
 
   def getAttributeMap(subsumer: Seq[NamedExpression],
       subsume: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = {
-    if (subsumer.length == subsume.length) {
+    // when datamap is created with duplicate columns like select sum(age),sum(age) from table,
+    // the subsumee will have duplicate, so handle that case here
+    if (subsumer.length == subsume.groupBy(_.name).size) {
       subsume.zip(subsumer).flatMap { case (left, right) =>
         var tuples = left collect {
           case attr: AttributeReference =>
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index b267203..8cb2f1f 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -129,7 +129,12 @@ object MVUtil {
       val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
       agg.collect {
         case Alias(attr: AggregateExpression, name) =>
-          if (attr.aggregateFunction.isInstanceOf[Count]) {
+          var isLiteralPresent = false
+          attr.aggregateFunction.collect {
+            case l@Literal(_, _) =>
+              isLiteralPresent = true
+          }
+          if (isLiteralPresent) {
             fieldToDataMapFieldMap +=
             getFieldToDataMapFields(name,
               attr.aggregateFunction.dataType,
@@ -137,7 +142,7 @@ object MVUtil {
               attr.aggregateFunction.nodeName,
               arrayBuffer,
               "")
-            aggregateType = "count"
+            aggregateType = attr.aggregateFunction.nodeName
           } else {
             aggregateType = attr.aggregateFunction.nodeName
           }
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index a8abdc3..d136b27 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -19,8 +19,6 @@ package org.apache.carbondata.mv.rewrite
 import java.io.File
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -992,7 +990,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     var frame = sql(querySQL)
     var analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "all_table_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "all_table_mv"))
     assert(2 == frame.collect().size)
     frame.collect().foreach { each =>
       if (1 == each.get(0)) {
@@ -1008,7 +1006,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     frame = sql(querySQL2)
     analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "all_table_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "all_table_mv"))
     assert(1 == frame.collect().size)
     frame.collect().foreach { each =>
       if (2 == each.get(0)) {
@@ -1034,11 +1032,11 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     val df1 = sql(
       "select name,address from mv_like where Country NOT LIKE 'US' group by name,address")
     val analyzed1 = df1.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed1, "mvlikedm1"))
+    assert(TestUtil.verifyMVDataMap(analyzed1, "mvlikedm1"))
     val df2 = sql(
       "select name,address,Country from mv_like where Country = 'US' or Country = 'China' group by name,address,Country")
     val analyzed2 = df2.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed2, "mvlikedm2"))
+    assert(TestUtil.verifyMVDataMap(analyzed2, "mvlikedm2"))
   }
 
   test("test distinct, count, sum on MV with single projection column") {
@@ -1074,11 +1072,24 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists mvtable1")
   }
 
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
+  test("test mv with duplicate columns in query and constant column") {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, age int, add string) stored by 'carbondata'")
+    sql("create datamap dupli_mv using 'mv' as select name, sum(age),sum(age) from maintable group by name")
+    sql("create datamap constant_mv using 'mv' as select name, sum(1) ex1 from maintable group by name")
+    sql("insert into maintable select 'pheobe',31,'NY'")
+    val df1 = sql("select sum(age),name from maintable group by name")
+    val df2 = sql("select sum(age),sum(age),name from maintable group by name")
+    val df3 = sql("select name, sum(1) ex1 from maintable group by name")
+    val df4 = sql("select sum(1) ex1 from maintable group by name")
+    val analyzed1 = df1.queryExecution.analyzed
+    val analyzed2 = df2.queryExecution.analyzed
+    val analyzed3 = df3.queryExecution.analyzed
+    val analyzed4 = df4.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed1, "dupli_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed2, "dupli_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed3, "constant_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed4, "constant_mv"))
   }
 
   def drop(): Unit = {
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVExceptionTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVExceptionTestCase.scala
index 7823d46..b2e6376 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVExceptionTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVExceptionTestCase.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.mv.rewrite
 
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -42,6 +42,13 @@ class MVExceptionTestCase  extends QueryTest with BeforeAndAfterAll {
     assertResult("DataMap with name main_table_mv1 already exists in storage")(ex.getMessage)
   }
 
+  test("test mv creation with limit in query") {
+    val ex = intercept[MalformedCarbonCommandException] {
+      sql("create datamap maintable_mv2 on table main_table using 'mv' as select sum(age),name from main_table group by name limit 10")
+    }
+    assertResult("MV datamap does not support the query with limit")(ex.getMessage)
+  }
+
   def drop(): Unit = {
     sql("drop table IF EXISTS main_table")
     sql("drop table if exists main_table_error")