You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/04/07 10:38:13 UTC

[carbondata] branch master updated: [CARBONDATA-3762] Block creating Materialized view's with duplicate column

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f35e4f  [CARBONDATA-3762] Block creating Materialized view's with duplicate column
6f35e4f is described below

commit 6f35e4f294874adce9e14aef9d9726b2a196a157
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed Apr 1 19:04:08 2020 +0530

    [CARBONDATA-3762] Block creating Materialized view's with duplicate column
    
    Why is this PR needed?
    Currently, materialized views with duplicate column
    1. On creation, we are taking distinct of Project columns.
    2. Because of this, during load, data is inserted wrongly and query gives wrong results.
    
    Materilaized views can be mapped against queries having duplicate columns, without having duplicate columns in mv table.
    
    What changes were proposed in this PR?
    1. Block creating materialized views with duplicate columns
    2. Fix bug in matching mv against queries having duplicate columns.
    
    Does this PR introduce any user interface change?
    Yes.
    
    Is any new testcase added?
    Yes
    
    This closes #3690
---
 .../org/apache/carbondata/view/MVHelper.scala      | 45 +++++++++++++++++++-
 .../command/view/CarbonCreateMVCommand.scala       |  2 -
 .../org/apache/spark/sql/optimizer/MVMatcher.scala | 49 +++++++++++++++++++---
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   | 11 ++++-
 .../mv/rewrite/TestAllOperationsOnMV.scala         | 45 +++++++++++++++++++-
 .../timeseries/TestMVTimeSeriesLoadAndQuery.scala  |  8 +++-
 6 files changed, 147 insertions(+), 13 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
index 76c4ec4..af6d276 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
@@ -17,8 +17,10 @@
 
 package org.apache.carbondata.view
 
+import java.util
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -31,6 +33,7 @@ import org.apache.spark.sql.execution.command.Field
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.DataType
 
+import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, ModularRelation, Select}
 import org.apache.carbondata.spark.util.CommonUtil
@@ -175,8 +178,13 @@ object MVHelper {
       projectList: Seq[NamedExpression],
       fieldCounter: AtomicInteger): mutable.LinkedHashMap[Field, MVField] = {
     val fieldsMap = scala.collection.mutable.LinkedHashMap.empty[Field, MVField]
+    // map of qualified name with list of column names
+    val fieldColumnsMap = new util.HashMap[String, java.util.ArrayList[String]]()
     projectList.map {
       case reference: AttributeReference =>
+        val columns = new util.ArrayList[String]()
+        columns.add(reference.qualifiedName)
+        findDuplicateColumns(fieldColumnsMap, reference.sql, columns, false)
         val relation = getRelation(relationList, reference)
         if (null != relation) {
           val relatedFields: ArrayBuffer[RelatedFieldWrapper] =
@@ -205,7 +213,10 @@ object MVHelper {
           )
         }
 
-      case Alias(reference: AttributeReference, name) =>
+      case a@Alias(reference: AttributeReference, name) =>
+        val columns = new util.ArrayList[String]()
+        columns.add(reference.qualifiedName)
+        findDuplicateColumns(fieldColumnsMap, a.sql, columns, true)
         val relation = getRelation(relationList, reference)
         if (null != relation) {
           val relatedFields: ArrayBuffer[RelatedFieldWrapper] =
@@ -236,8 +247,10 @@ object MVHelper {
         }
         val relatedFields: ArrayBuffer[RelatedFieldWrapper] =
           new ArrayBuffer[RelatedFieldWrapper]()
+        val columns = new util.ArrayList[String]()
         alias.collect {
           case reference: AttributeReference =>
+            columns.add(reference.qualifiedName)
             val relation = getRelation(relationList, reference)
             if (null != relation) {
               relatedFields += RelatedFieldWrapper(
@@ -246,6 +259,7 @@ object MVHelper {
                 reference.name)
             }
         }
+       findDuplicateColumns(fieldColumnsMap, alias.sql, columns, true)
         fieldsMap.put(
           newField(
             "",
@@ -267,8 +281,10 @@ object MVHelper {
         }
         val relatedFields: ArrayBuffer[RelatedFieldWrapper] =
           new ArrayBuffer[RelatedFieldWrapper]()
+        val columns = new util.ArrayList[String]()
         alias.collect {
           case reference: AttributeReference =>
+            columns.add(reference.qualifiedName)
             val relation = getRelation(relationList, reference)
             if (null != relation) {
               relatedFields += RelatedFieldWrapper(
@@ -277,6 +293,7 @@ object MVHelper {
                 relation.identifier.table)
             }
         }
+        findDuplicateColumns(fieldColumnsMap, alias.sql, columns, true)
         fieldsMap.put(
           newField(
             "",
@@ -291,6 +308,32 @@ object MVHelper {
     fieldsMap
   }
 
+  private def findDuplicateColumns(
+      fieldColumnsMap: util.HashMap[String, util.ArrayList[String]],
+      columnName: String,
+      columns: util.ArrayList[String],
+      isAlias: Boolean): Unit = {
+    // get qualified name without alias name
+    val qualifiedName = if (isAlias) {
+      columnName.substring(0, columnName.indexOf(" AS"))
+    } else {
+      columnName
+    }
+    if (null == fieldColumnsMap.get(qualifiedName)) {
+      // case to check create mv with same column and different alias names
+      if (fieldColumnsMap.containsKey(qualifiedName)) {
+        throw new MalformedMVCommandException(
+          "Cannot create mv having duplicate column with different alias name: " + columnName)
+      }
+      fieldColumnsMap.put(qualifiedName, columns)
+    } else {
+      if (fieldColumnsMap.get(qualifiedName).containsAll(columns)) {
+        throw new MalformedMVCommandException(
+          "Cannot create mv with duplicate column: " + columnName)
+      }
+    }
+  }
+
   /**
    * Return the catalog table after matching the attr in logicalRelation
    */
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index b5430ce..12f1b27 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -154,8 +154,6 @@ case class CarbonCreateMVCommand(
     val logicalPlan = MVHelper.dropDummyFunction(
       MVQueryParser.getQueryPlan(queryString, session))
     val modularPlan = checkQuery(session, logicalPlan)
-    // the ctas query can have duplicate columns, so we should take distinct and create fields,
-    // so that it won't fail during create mv table
     val viewSchema = getOutputSchema(logicalPlan)
     val relatedTables = getRelatedTables(logicalPlan)
     val relatedTableList = toCarbonTables(session, relatedTables)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
index 29471e3..9003627 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
@@ -1258,11 +1258,12 @@ private object SelectSelectGroupbyChildDelta
         Some(gb_2c@modular.GroupBy(
         _, _, _, _, sel_2c@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _))
         ) =>
+        val distinctGrpByOList = getDistinctOutputList(gb_2q.outputList)
         if (sel_3q.predicateList.contains(exprE)) {
           val expr1E = exprE.transform {
             case attr: Attribute =>
               gb_2c.outputList.lift(
-                gb_2q.outputList.indexWhere {
+                distinctGrpByOList.indexWhere {
                   case alias: Alias if alias.toAttribute.semanticEquals(attr) => true;
                   case _ => false
                 }).getOrElse { attr }
@@ -1277,14 +1278,14 @@ private object SelectSelectGroupbyChildDelta
           exprE match {
             case attr: Attribute => // this subexpression must in subsumee select output list
               gb_2c.outputList.lift(
-                gb_2q.outputList.indexWhere {
+                distinctGrpByOList.indexWhere {
                   case a if a.toAttribute.semanticEquals(attr) => true;
                   case _ => false
                 })
 
             case alias: Alias =>
               gb_2c.outputList.lift(
-                gb_2q.outputList.indexWhere {
+                distinctGrpByOList.indexWhere {
                   case a if a.toAttribute.semanticEquals(alias.toAttribute) => true;
                   case _ => false
                 })
@@ -1342,6 +1343,40 @@ private object SelectSelectGroupbyChildDelta
     }
   }
 
+  /**
+   * Removes duplicate projection in the output list for query matching
+   */
+  def getDistinctOutputList(outputList: Seq[NamedExpression]): Seq[NamedExpression] = {
+    var distinctOList: Seq[NamedExpression] = Seq.empty
+    outputList.foreach { output =>
+      if (distinctOList.isEmpty) {
+        distinctOList = distinctOList :+ output
+      } else {
+        // get output name
+        var outputName = output.name
+        if (output.isInstanceOf[Alias]) {
+          // In case of queries with join on more than one table and projection list having
+          // aggregation of same column name on join tables like sum(t1.column), sum(t2.column),
+          // in that case, compare alias name with column id, as alias name will be same for
+          // both output(sum(t1))
+          val projectName = output.simpleString
+          outputName = projectName.substring(0, projectName.indexOf(" AS"))
+        }
+        if (!distinctOList.exists(distinctOutput =>
+          if (distinctOutput.isInstanceOf[Alias]) {
+            val projectName = distinctOutput.simpleString
+            val aliasName = projectName.substring(0, projectName.indexOf(" AS"))
+            aliasName.equalsIgnoreCase(outputName)
+          } else {
+            distinctOutput.qualifiedName.equalsIgnoreCase(output.qualifiedName)
+          })) {
+          distinctOList = distinctOList :+ output
+        }
+      }
+    }
+    distinctOList
+  }
+
   def apply(
       subsumer: ModularPlan,
       subsumee: ModularPlan,
@@ -1352,14 +1387,16 @@ private object SelectSelectGroupbyChildDelta
         sel_3a@modular.Select(
         _, _, Nil, _, _,
         Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
-        sel_3q@modular.Select(
+        sel_3q_dup@modular.Select(
         _, _, _, _, _,
         Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
         Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)),
         _ :: Nil,
         _ :: Nil) =>
         val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
-        val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl }
+        val tbls_sel_3q = sel_3q_dup.collect { case tbl: modular.LeafNode => tbl }
+        val distinctSelOList = getDistinctOutputList(sel_3q_dup.outputList)
+        val sel_3q = sel_3q_dup.copy(outputList = distinctSelOList)
 
         val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains)
         val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains)
@@ -1423,7 +1460,7 @@ private object SelectSelectGroupbyChildDelta
               val aliasMap_exp = AttributeMap(
                 gb_2c.outputList.collect {
                   case a: Alias => (a.toAttribute, AliasWrapper(a)) })
-              val sel_3q_exp = sel_3q.transformExpressions({
+              val sel_3q_exp = sel_3q_dup.transformExpressions({
                 case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr)
               }).transformExpressions {
                 case AliasWrapper(alias: Alias) => alias
diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 02c3cfd..8f9489a 100644
--- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
@@ -1175,8 +1176,14 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
     sql("drop table if exists maintable")
     sql("create table maintable(name string, age int, add string) STORED AS carbondata")
-    sql("create materialized view dupli_mv as select name, sum(age),sum(age) from maintable group by name")
-    sql("create materialized view dupli_projection as select age, age,add from maintable")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view dupli_mv as select name, sum(age),sum(age) from maintable group by name")
+    }.getMessage.contains("Cannot create mv with duplicate column: sum(maintable.age)")
+    sql("create materialized view dupli_mv as select name, sum(age) from maintable group by name")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view dupli_projection as select age, age,add from maintable")
+    }.getMessage.contains("Cannot create mv with duplicate column: maintable.age")
+    sql("create materialized view dupli_projection as select age,add from maintable")
     sql("create materialized view constant_mv as select name, sum(1) ex1 from maintable group by name")
     sql("insert into maintable select 'pheobe',31,'NY'")
     val df1 = sql("select sum(age),name from maintable group by name")
diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 8b7a082..3ed7d43 100644
--- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedMVCommandException}
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -627,6 +627,49 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
     sql("drop table IF EXISTS maintable")
   }
 
+  test("test duplicate column name in mv") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) STORED AS carbondata")
+    sql("insert into table maintable values('abc',21,2000),('mno',24,3000)")
+    sql("drop materialized view if exists mv1")
+    val res1 = sql("select name,sum(c_code) from maintable group by name")
+    val res2 = sql("select name, name,sum(c_code),sum(c_code) from maintable  group by name")
+    val res3 = sql("select c_code,price from maintable")
+    sql("create materialized view mv1 as select name,sum(c_code) from maintable group by name")
+    val df1 = sql("select name,sum(c_code) from maintable group by name")
+    TestUtil.verifyMVDataMap(df1.queryExecution.optimizedPlan, "mv1")
+    checkAnswer(res1, df1)
+    val df2 = sql("select name, name,sum(c_code),sum(c_code) from maintable  group by name")
+    TestUtil.verifyMVDataMap(df2.queryExecution.optimizedPlan, "mv1")
+    checkAnswer(df2, res2)
+    sql("drop materialized view if exists mv2")
+    sql("create materialized view mv2 as select c_code,price from maintable")
+    val df3 = sql("select c_code,price from maintable")
+    TestUtil.verifyMVDataMap(df3.queryExecution.optimizedPlan, "mv2")
+    checkAnswer(res3, df3)
+    val df4 = sql("select c_code,price,price,c_code from maintable")
+    TestUtil.verifyMVDataMap(df4.queryExecution.optimizedPlan, "mv2")
+    checkAnswer(df4, Seq(Row(21,2000,2000,21), Row(24,3000,3000,24)))
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test duplicate column with different alias name") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) STORED AS carbondata")
+    sql("insert into table maintable values('abc',21,2000),('mno',24,3000)")
+    sql("drop materialized view if exists mv1")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view mv1 as select name,sum(c_code),sum(c_code) as a from maintable group by name")
+    }.getMessage.contains("Cannot create mv having duplicate column with different alias name: sum(CAST(maintable.`c_code` AS BIGINT)) AS `a`")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view mv1 as select name,name as a from maintable")
+    }.getMessage.contains("Cannot create mv having duplicate column with different alias name: maintable.`name` AS `a`")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view mv1 as select name as a,name as b from maintable")
+    }.getMessage.contains("Cannot create mv having duplicate column with different alias name: maintable.`name` AS `b`")
+    sql("drop table IF EXISTS maintable")
+  }
+
   test("drop meta cache on mv materialized view table") {
     defaultConfig()
     sql("drop table IF EXISTS maintable")
diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
index c0d1b6e..fa05268 100644
--- a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
+++ b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.mv.rewrite.TestUtil
@@ -290,9 +291,14 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
     dropDataMap("datamap1")
+    intercept[MalformedMVCommandException] {
+      sql(
+        "create materialized view datamap1 as " +
+        "select timeseries(projectjoindate,'month') ,sum(projectcode),sum(projectcode)  from maintable group by timeseries(projectjoindate,'month')")
+    }
     sql(
       "create materialized view datamap1 as " +
-      "select timeseries(projectjoindate,'month') ,sum(projectcode),sum(projectcode)  from maintable group by timeseries(projectjoindate,'month')")
+      "select timeseries(projectjoindate,'month') ,sum(projectcode)  from maintable group by timeseries(projectjoindate,'month')")
     loadData("maintable")
     val df1 = sql("select timeseries(projectjoindate,'month') ,sum(projectcode)  from maintable group by timeseries(projectjoindate,'month')")
     checkPlan("datamap1", df1)