You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/21 12:13:11 UTC

[carbondata] branch master updated: [CARBONDATA-3444]Fix MV query failure when column name and table name is same in case of join scenario

This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b0e79c  [CARBONDATA-3444]Fix MV query failure when column name and table name is same in case of join scenario
2b0e79c is described below

commit 2b0e79c66357fce671c5f421fd5c400e28f69fde
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Wed Jun 19 12:52:30 2019 +0530

    [CARBONDATA-3444]Fix MV query failure when column name and table name is same in case of join scenario
    
    Problem:
    when there are columns with same in different table, after sql generation, the project column will be like gen_subsumer_0.product ,
    it fails during logical plan generation from rewritten query, as column names will be ambigous
    
    Solution:
    update the outputlist when there are duplicate columns present in query. Here we can form the qualified name for the Attribute reference.
    So when qualifier is defined for column, the qualified name wil be like <col_qualifier_name>_<col.name>,
    if qualifier is not defined, then it will be <col_exprId_id>_<col.name>. So update for all the nodes like groupby , select nodes,
    so that it will be handled when there will be amguity in columns.
    
    This closes #3297
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    |  4 +-
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  | 41 ++++++++++++++++++++
 .../carbondata/mv/rewrite/DefaultMatchMaker.scala  | 20 +++-------
 .../apache/carbondata/mv/rewrite/Navigator.scala   | 16 +++++---
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   | 45 ++++++++++++++++++++++
 5 files changed, 105 insertions(+), 21 deletions(-)

diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 4d43088..c0831ae 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -583,7 +583,9 @@ object MVHelper {
         val relation =
           s.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
         val outputList = getUpdatedOutputList(relation.outputList, s.dataMapTableRelation)
-        val mappings = s.outputList zip outputList
+        // when the output list contains multiple projection of same column, but relation
+        // contains distinct columns, mapping may go wrong with columns, so select distinct
+        val mappings = s.outputList.distinct zip outputList
         val oList = for ((o1, o2) <- mappings) yield {
           if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
         }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 8cb2f1f..4dff5b8 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -310,4 +310,45 @@ object MVUtil {
         " are not allowed for this datamap")
     }
   }
+
+  def updateDuplicateColumns(outputList: Seq[NamedExpression]): Seq[NamedExpression] = {
+    val duplicateNameCols = outputList.groupBy(_.name).filter(_._2.length > 1).flatMap(_._2)
+      .toList
+    val updatedOutList = outputList.map { col =>
+      val duplicateColumn = duplicateNameCols
+        .find(a => a.semanticEquals(col))
+      val qualifiedName = col.qualifier.getOrElse(s"${ col.exprId.id }") + "_" + col.name
+      if (duplicateColumn.isDefined) {
+        val attributesOfDuplicateCol = duplicateColumn.get.collect {
+          case a: AttributeReference => a
+        }
+        val attributeOfCol = col.collect { case a: AttributeReference => a }
+        // here need to check the whether the duplicate columns is of same tables,
+        // since query with duplicate columns is valid, we need to make sure, not to change their
+        // names with above defined qualifier name, for example in case of some expression like
+        // cast((FLOOR((cast(col_name) as double))).., upper layer even exprid will be same,
+        // we need to find the attribute ref(col_name) at lower level and check where expid is same
+        // or of same tables, so doin the semantic equals
+        val isStrictDuplicate = attributesOfDuplicateCol.forall(expr =>
+          attributeOfCol.exists(a => a.semanticEquals(expr)))
+        if (!isStrictDuplicate) {
+          Alias(col, qualifiedName)(exprId = col.exprId)
+        } else if (col.qualifier.isDefined) {
+          Alias(col, qualifiedName)(exprId = col.exprId)
+          // this check is added in scenario where the column is direct Attribute reference and
+          // since duplicate columns select is allowed, we should just put alias for those columns
+          // and update, for this also above isStrictDuplicate will be true so, it will not be
+          // updated above
+        } else if (duplicateColumn.get.isInstanceOf[AttributeReference] &&
+                   col.isInstanceOf[AttributeReference]) {
+          Alias(col, qualifiedName)(exprId = col.exprId)
+        } else {
+          col
+        }
+      } else {
+        col
+      }
+    }
+    updatedOutList
+  }
 }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index bd412e7..9a9a2a6 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -124,7 +124,10 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
           exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
                                  a1.asInstanceOf[Alias].child.semanticEquals(a.child)) ||
           exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer))
-        case exp => exprListR.exists(_.semanticEquals(exp) || canEvaluate(exp, subsumer))
+        case exp =>
+          exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
+                                 a1.asInstanceOf[Alias].child.semanticEquals(exp)) ||
+          exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer))
       }
     } else {
       false
@@ -244,8 +247,7 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
               val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
               val tAliasMap = new collection.mutable.HashMap[Int, String]()
 
-              val updatedOutList: Seq[NamedExpression] = updateDuplicateColumns(sel_1a)
-              val usel_1a = sel_1a.copy(outputList = updatedOutList)
+              val usel_1a = sel_1a.copy(outputList = sel_1a.outputList)
               tChildren += usel_1a
               tAliasMap += (tChildren.indexOf(usel_1a) -> rewrite.newSubsumerName())
 
@@ -350,18 +352,6 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
     }
   }
 
-  private def updateDuplicateColumns(sel_1a: Select) = {
-    val duplicateNameCols = sel_1a.outputList.groupBy(_.name).filter(_._2.length > 1).flatMap(_._2)
-      .toList
-    val updatedOutList = sel_1a.outputList.map { col =>
-      if (duplicateNameCols.contains(col)) {
-        Alias(col, col.qualifiedName)(exprId = col.exprId)
-      } else {
-        col
-      }
-    }
-    updatedOutList
-  }
 }
 
 object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
index 905cd17..b2f1039 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.mv.rewrite
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
 
+import org.apache.carbondata.mv.datamap.MVUtil
 import org.apache.carbondata.mv.expressions.modular._
 import org.apache.carbondata.mv.plans.modular
 import org.apache.carbondata.mv.plans.modular._
@@ -121,11 +122,16 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession)
       dataMapRelation: ModularPlan): ModularPlan = {
     // Update datamap table relation to the subsumer modular plan
     val updatedSubsumer = subsumer match {
-        // In case of order by it adds extra select but that can be ignored while doing selection.
-      case s@ Select(_, _, _, _, _, Seq(g: GroupBy), _, _, _, _) =>
-        s.copy(children = Seq(g.copy(dataMapTableRelation = Some(dataMapRelation))))
-      case s: Select => s.copy(dataMapTableRelation = Some(dataMapRelation))
-      case g: GroupBy => g.copy(dataMapTableRelation = Some(dataMapRelation))
+      // In case of order by it adds extra select but that can be ignored while doing selection.
+      case s@Select(_, _, _, _, _, Seq(g: GroupBy), _, _, _, _) =>
+        s.copy(children = Seq(g.copy(dataMapTableRelation = Some(dataMapRelation))),
+            outputList = MVUtil.updateDuplicateColumns(s.outputList))
+      case s: Select => s
+        .copy(dataMapTableRelation = Some(dataMapRelation),
+          outputList = MVUtil.updateDuplicateColumns(s.outputList))
+      case g: GroupBy => g
+        .copy(dataMapTableRelation = Some(dataMapRelation),
+          outputList = MVUtil.updateDuplicateColumns(g.outputList))
       case other => other
     }
     (updatedSubsumer, subsumee) match {
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index d136b27..e7da16a 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -426,6 +426,32 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"drop datamap datamap29")
   }
 
+  test("test create datamap with join with group by and projection with filter") {
+    sql("drop datamap if exists datamap29")
+    sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+    val frame = sql(
+      "select t1.empname ,t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where " +
+      "t1.empname = t2.empname and t1.empname='shivani' group by t2.designation,t1.empname ")
+    val analyzed = frame.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap29"))
+    checkAnswer(frame, sql("select t1.empname ,t2.designation, sum(t1.utilization) from fact_table4 t1,fact_table5 t2  where " +
+                           "t1.empname = t2.empname and t1.empname='shivani' group by t2.designation,t1.empname "))
+    sql(s"drop datamap datamap29")
+  }
+
+  test("test create datamap with join with group by and sub projection with filter with alias") {
+    sql("drop datamap if exists datamap29")
+    sql("create datamap datamap29 using 'mv' as select t1.empname as a, t2.designation as b, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+    val frame = sql(
+      "select t1.empname ,t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where " +
+      "t1.empname = t2.empname and t1.empname='shivani' group by t2.designation,t1.empname ")
+    val analyzed = frame.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap29"))
+    checkAnswer(frame, sql("select t1.empname ,t2.designation, sum(t1.utilization) from fact_table4 t1,fact_table5 t2  where " +
+                           "t1.empname = t2.empname and t1.empname='shivani' group by t2.designation,t1.empname "))
+    sql(s"drop datamap datamap29")
+  }
+
   ignore("test create datamap with join with group by with filter") {
     sql("drop datamap if exists datamap30")
     sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
@@ -1076,22 +1102,41 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists maintable")
     sql("create table maintable(name string, age int, add string) stored by 'carbondata'")
     sql("create datamap dupli_mv using 'mv' as select name, sum(age),sum(age) from maintable group by name")
+    sql("create datamap dupli_projection using 'mv' as select age, age,add from maintable")
     sql("create datamap constant_mv using 'mv' as select name, sum(1) ex1 from maintable group by name")
     sql("insert into maintable select 'pheobe',31,'NY'")
     val df1 = sql("select sum(age),name from maintable group by name")
     val df2 = sql("select sum(age),sum(age),name from maintable group by name")
     val df3 = sql("select name, sum(1) ex1 from maintable group by name")
     val df4 = sql("select sum(1) ex1 from maintable group by name")
+    val df5 = sql("select age,age,add from maintable")
+    val df6 = sql("select age,add from maintable")
     val analyzed1 = df1.queryExecution.analyzed
     val analyzed2 = df2.queryExecution.analyzed
     val analyzed3 = df3.queryExecution.analyzed
     val analyzed4 = df4.queryExecution.analyzed
+    val analyzed5 = df5.queryExecution.analyzed
+    val analyzed6 = df6.queryExecution.analyzed
     assert(TestUtil.verifyMVDataMap(analyzed1, "dupli_mv"))
     assert(TestUtil.verifyMVDataMap(analyzed2, "dupli_mv"))
     assert(TestUtil.verifyMVDataMap(analyzed3, "constant_mv"))
     assert(TestUtil.verifyMVDataMap(analyzed4, "constant_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed5, "dupli_projection"))
+    assert(TestUtil.verifyMVDataMap(analyzed6, "dupli_projection"))
   }
 
+  test("test mv query when the column names and table name same in join scenario") {
+    sql("drop table IF EXISTS price")
+    sql("drop table IF EXISTS quality")
+    sql("create table price(product string,price int) stored by 'carbondata'")
+    sql("create table quality(product string,quality string) stored by 'carbondata'")
+    sql("create datamap same_mv using 'mv' as select price.product,price.price,quality.product,quality.quality from price,quality where price.product = quality.product")
+    val df1 = sql("select price.product from price,quality where price.product = quality.product")
+    val analyzed1 = df1.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed1, "same_mv"))
+  }
+
+
   def drop(): Unit = {
     sql("drop table IF EXISTS fact_table1")
     sql("drop table IF EXISTS fact_table2")