You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/05/13 19:15:08 UTC

[carbondata] branch master updated: [CARBONDATA-3818] Missed code to check "MV with same query already exists" during MV Refactory

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4999183  [CARBONDATA-3818] Missed code to check "MV with same query already exists" during MV Refactory
4999183 is described below

commit 4999183c20499afc35b220db44a89185754dc712
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Mon May 11 18:04:52 2020 +0530

    [CARBONDATA-3818] Missed code to check "MV with same query already exists" during MV Refactory
    
    Why is this PR needed?
    Missed code to check "MV with same query already exists" during MV Refactory.
    
    What changes were proposed in this PR?
    Add code to which if "MV with same query already exists" and add testcase
    
    This closes #3761
---
 .../apache/carbondata/view/MVCatalogInSpark.scala  |  7 ++++++
 .../org/apache/carbondata/view/MVHelper.scala      |  4 +--
 .../command/view/CarbonCreateMVCommand.scala       | 29 ++++++++++++++--------
 .../timeseries/TestCreateMVWithTimeSeries.scala    | 12 ++++++++-
 4 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
index 85b4909..b75a88b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
@@ -162,6 +162,13 @@ case class MVCatalogInSpark(session: SparkSession)
     }
   }
 
+  /**
+   * Check and return mv having same query already present in the catalog
+   */
+  def getMVWithSameQueryPresent(query: LogicalPlan): Option[MVSchemaWrapper] = {
+    lookupSchema(query)
+  }
+
   /** Returns feasible registered mv schemas for processing the given ModularPlan. */
   private def lookupSchema(plan: LogicalPlan): Option[MVSchemaWrapper] = {
     withReadLock {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
index e072653..08c679e 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
@@ -289,8 +289,8 @@ object MVHelper {
             if (null != relation) {
               relatedFields += RelatedFieldWrapper(
                 relation.database,
-                reference.name,
-                relation.identifier.table)
+                relation.identifier.table,
+                reference.name)
             }
         }
         findDuplicateColumns(fieldColumnsMap, alias.sql, columns, true)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 22c4b52..1106870 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -101,15 +101,16 @@ case class CarbonCreateMVCommand(
       }
     }
 
-    val schema = doCreate(session, identifier, viewManager)
+    // get mv catalog
+    var viewCatalog = viewManager.getCatalog(catalogFactory, false)
+      .asInstanceOf[MVCatalogInSpark]
+    if (!viewCatalog.session.equals(session)) {
+      viewCatalog = viewManager.getCatalog(catalogFactory, true)
+        .asInstanceOf[MVCatalogInSpark]
+    }
+    val schema = doCreate(session, identifier, viewManager, viewCatalog)
 
     try {
-      var viewCatalog = viewManager.getCatalog(catalogFactory, false)
-        .asInstanceOf[MVCatalogInSpark]
-      if (!viewCatalog.session.equals(session)) {
-        viewCatalog = viewManager.getCatalog(catalogFactory, true)
-          .asInstanceOf[MVCatalogInSpark]
-      }
       viewCatalog.registerSchema(schema)
       if (schema.isRefreshOnManual) {
         viewManager.setStatus(schema.getIdentifier, MVStatus.DISABLED)
@@ -156,10 +157,18 @@ case class CarbonCreateMVCommand(
 
   private def doCreate(session: SparkSession,
       tableIdentifier: TableIdentifier,
-      viewManager: MVManagerInSpark): MVSchema = {
+      viewManager: MVManagerInSpark,
+      viewCatalog: MVCatalogInSpark): MVSchema = {
     val logicalPlan = MVHelper.dropDummyFunction(
       MVQueryParser.getQueryPlan(queryString, session))
-    val modularPlan = checkQuery(session, logicalPlan)
+      // check if mv with same query already exists
+    val mvSchemaWrapper = viewCatalog.getMVWithSameQueryPresent(logicalPlan)
+    if (mvSchemaWrapper.nonEmpty) {
+      val mvWithSameQuery = mvSchemaWrapper.get.viewSchema.getIdentifier.getTableName
+      throw new MalformedMVCommandException(
+        s"MV with the name `$mvWithSameQuery` has been already created with the same query")
+    }
+    val modularPlan = checkQuery(logicalPlan)
     val viewSchema = getOutputSchema(logicalPlan)
     val relatedTables = getRelatedTables(logicalPlan)
     val relatedTableList = toCarbonTables(session, relatedTables)
@@ -462,7 +471,7 @@ case class CarbonCreateMVCommand(
     generatePartitionerField(relatedTablePartitionColumns.toList, Seq.empty)
   }
 
-  private def checkQuery(sparkSession: SparkSession, logicalPlan: LogicalPlan): ModularPlan = {
+  private def checkQuery(logicalPlan: LogicalPlan): ModularPlan = {
     // if there is limit in query string, throw exception, as its not a valid usecase
     logicalPlan match {
       case Limit(_, _) =>
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
index 57b8ce0..0218adb 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.view.timeseries
 
 import java.util.concurrent.{Callable, Executors, TimeUnit}
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedMVCommandException}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.view.rewrite.TestUtil
@@ -207,6 +207,16 @@ class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll {
     sql("drop materialized view if exists mv1")
   }
 
+  test("check if mv with same query exists") {
+    sql("drop materialized view if exists mv1")
+    sql("create materialized view mv1 as select timeseries(projectjoindate,'Second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'Second')")
+    sql("drop materialized view if exists mv2")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view mv2 as select timeseries(projectjoindate,'Second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'Second')")
+    }.getMessage.contains("MV with the name `mv1` has been already created with the same query")
+    sql("drop materialized view if exists mv1")
+  }
+
   class QueryTask(query: String) extends Callable[String] {
     override def call(): String = {
       var result = "PASS"