You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/07/24 04:50:23 UTC

[carbondata] branch master updated: [CARBONDATA-3899] Drop materialized view when executed concurrently from 4 concurrent client fails in all 4 clients.

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ea6ab28  [CARBONDATA-3899] Drop materialized view when executed concurrently from 4 concurrent client fails in all 4 clients.
ea6ab28 is described below

commit ea6ab2836d273bebbfe5aaf2faba1fdf0d3af12d
Author: Shreelekhya <sh...@yahoo.com>
AuthorDate: Mon Jul 13 21:08:54 2020 +0530

    [CARBONDATA-3899] Drop materialized view when executed concurrently from 4 concurrent client fails in
    all 4 clients.
    
    Why is this PR needed?
    1. drop materialized view when executed concurrently from 4 concurrent client fails in all 4 clients.
    Also, materialized view when created in one session and queried on another open session will not hit MV.
    2. drop mv if not exists must fail.
    
    What changes were proposed in this PR?
    1. made view manager as a single instance for all sessions, so that changes executed from one session
    gets reflected in others.
    2. added check for drop mv command to throw an exception if the materialized view is not present.
    
    This closes #3841
---
 .../java/org/apache/carbondata/core/view/MVManager.java   |  2 +-
 .../org/apache/carbondata/view/MVManagerInSpark.scala     | 15 +++------------
 .../sql/execution/command/view/CarbonDropMVCommand.scala  |  8 ++++++++
 .../test/scala/org/apache/carbondata/view/MVTest.scala    |  9 +++++++++
 .../apache/carbondata/view/rewrite/MVCreateTestCase.scala |  4 ++--
 5 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
index e2a8d9f..bc02207 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
@@ -143,7 +143,7 @@ public abstract class MVManager {
   /**
    * Drops the mv schema from storage
    *
-   * @param viewName index name
+   * @param viewName mv name
    */
   public void deleteSchema(String databaseName, String viewName) throws IOException {
     schemaProvider.dropSchema(this, databaseName, viewName);
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
index 8bd4e86..50fb6d7 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
@@ -48,23 +48,14 @@ class MVManagerInSpark(session: SparkSession) extends MVManager {
 
 object MVManagerInSpark {
 
-  private val MANAGER_MAP_BY_SESSION =
-    new util.HashMap[SparkSession, MVManagerInSpark]()
+  private var viewManager: MVManagerInSpark = null
 
+  // returns single MVManager instance for all the current sessions.
   def get(session: SparkSession): MVManagerInSpark = {
-    var viewManager = MANAGER_MAP_BY_SESSION.get(session)
     if (viewManager == null) {
-      MANAGER_MAP_BY_SESSION.synchronized {
-        viewManager = MANAGER_MAP_BY_SESSION.get(session)
+      this.synchronized {
         if (viewManager == null) {
           viewManager = new MVManagerInSpark(session)
-          MANAGER_MAP_BY_SESSION.put(session, viewManager)
-          session.sparkContext.addSparkListener(new SparkListener {
-            override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-              CarbonEnv.carbonEnvMap.remove(session)
-              ThreadLocalSessionInfo.unsetAll()
-            }
-          })
         }
       }
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
index 5aa5a04..b201db6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
@@ -90,6 +91,13 @@ case class CarbonDropMVCommand(
         }
 
         this.dropTableCommand = dropTableCommand
+      } else {
+        if (!ifExistsSet) {
+          throw new MalformedMVCommandException(
+            s"Materialized view with name ${ databaseName }.${ name } does not exists")
+        } else {
+          return Seq.empty
+        }
       }
     } catch {
       case exception: Exception =>
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala
index bf9ac6b..00dd6b5 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala
@@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
@@ -178,6 +180,13 @@ class MVTest extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test drop mv must fail if not exists") {
+    val ex = intercept[MalformedMVCommandException] {
+      sql("drop materialized view MV_notPresent")
+    }
+    assert(ex.getMessage.contains("Materialized view with name default.MV_notPresent does not exists"))
+  }
+
   test("test refresh mv on manual") {
     sql("drop materialized view if exists mv1")
     sql("drop table if exists source")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
index d5b11fd..db39737 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
@@ -955,9 +955,9 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql(" insert into mvtable1 select 'n4',12,12")
     sql("update mvtable1 set(name) = ('updatedName')").show()
     checkAnswer(sql("select count(*) from mvtable1 where name = 'updatedName'"),Seq(Row(4)))
+    sql(s"drop materialized view MV11")
     sql("drop table if exists mvtable1")
     sql("drop table if exists mvtable2")
-    sql(s"drop materialized view MV11")
   }
 
   test("test create materialized view with streaming table")  {
@@ -1166,8 +1166,8 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     val frame = sql("select count(*) from mvtable1")
     assert(!TestUtil.verifyMVHit(frame.queryExecution.optimizedPlan, "MV11"))
     checkAnswer(frame,Seq(Row(1)))
-    sql("drop table if exists mvtable1")
     sql(s"drop materialized view MV11")
+    sql("drop table if exists mvtable1")
   }
 
   test("test mv with duplicate columns in query and constant column") {