You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/07/09 08:12:49 UTC

[carbondata] branch master updated: [CARBONDATA-3842] Fix incorrect results on mv with limit (Missed code during mv refcatory)

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new cdfe920  [CARBONDATA-3842] Fix incorrect results on mv with limit (Missed code during mv refcatory)
cdfe920 is described below

commit cdfe92075846a1c539d930eaf6a39079c3520ce4
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed Jun 3 11:46:44 2020 +0530

    [CARBONDATA-3842] Fix incorrect results on mv with limit (Missed code during mv refcatory)
    
    Why is this PR needed?
    Already issue fixed in PR-3652. Missed code during mv code refactory
    
    What changes were proposed in this PR?
    Copy subsume Flag and FlagSpec to subsumerPlan while rewriting with summarydatasets.
    Update the flagSpec as per the mv attributes and copy to relation.
    
    This closes #3786
---
 .../src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala   | 6 +++++-
 .../src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala   | 6 +++++-
 .../org/apache/carbondata/view/rewrite/TestAllOperationsOnMV.scala  | 4 ++--
 3 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
index 7b5d9df..d862920 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
@@ -726,7 +726,11 @@ private object SelectSelectNoChildDelta extends MVMatchPattern with PredicateHel
 
           if (r2eJoinsMatch) {
             if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty && isLOEmLOR) {
-              Seq(sel_1a)
+              if (sel_1q.flagSpec.isEmpty) {
+                Seq(sel_1a)
+              } else {
+                Seq(sel_1a.copy(flags = sel_1q.flags, flagSpec = sel_1q.flagSpec))
+              }
             } else {
               // no compensation needed
               val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
index 7d0f850..7818149 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
@@ -701,6 +701,9 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
       case select: Select if select.modularPlan.isDefined =>
         val planWrapper = select.modularPlan.get.asInstanceOf[MVPlanWrapper]
         val plan = planWrapper.modularPlan.asInstanceOf[Select]
+        val aliasMap = getAliasMap(plan.outputList, select.outputList)
+        // Update the flagSpec as per the mv table attributes.
+        val updatedFlagSpec = updateFlagSpec(select, plan, aliasMap, keepAlias = false)
         // when the output list contains multiple projection of same column, but relation
         // contains distinct columns, mapping may go wrong with columns, so select distinct
         val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, select.modularPlan)
@@ -712,7 +715,8 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
             output2
           }
         }
-        plan.copy(outputList = outputList).setRewritten()
+        plan.copy(outputList = outputList, flags = select.flags, flagSpec = updatedFlagSpec)
+          .setRewritten()
       case select: Select => select.children match {
         case Seq(groupBy: GroupBy) if groupBy.modularPlan.isDefined =>
           val planWrapper = groupBy.modularPlan.get.asInstanceOf[MVPlanWrapper]
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestAllOperationsOnMV.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestAllOperationsOnMV.scala
index 680c21e..c1d7d0e 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestAllOperationsOnMV.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestAllOperationsOnMV.scala
@@ -610,10 +610,10 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
     sql("drop materialized view if exists mv1")
     sql("create materialized view mv1  as select a.name,a.price from maintable a")
     var dataFrame = sql("select a.name,a.price from maintable a limit 1")
-    assert(dataFrame.count() == 1)
+    assert(dataFrame.collect().length == 1)
     TestUtil.verifyMVHit(dataFrame.queryExecution.optimizedPlan, "mv1")
     dataFrame = sql("select a.name,a.price from maintable a order by a.name limit 1")
-    assert(dataFrame.count() == 1)
+    assert(dataFrame.collect().length == 1)
     TestUtil.verifyMVHit(dataFrame.queryExecution.optimizedPlan, "mv1")
     sql("drop table if exists maintable")
   }