You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/24 08:47:48 UTC

[carbondata] branch master updated: [CARBONDATA-3398]Fix drop metacache on table having mv datamap

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bdb50ed  [CARBONDATA-3398]Fix drop metacache on table having mv datamap
bdb50ed is described below

commit bdb50ed781ea067dad9c41060cfbf06647bdc35c
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Mon Jun 10 12:38:16 2019 +0530

    [CARBONDATA-3398]Fix drop metacache on table having mv datamap
    
    Fixed drop metacache on table having mv datamap
    
    This closes #3274
---
 .../mv/rewrite/TestAllOperationsOnMV.scala         | 48 +++++++++++++++++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |  2 +-
 .../command/cache/DropCacheEventListeners.scala    | 59 +++++++++++++---------
 3 files changed, 83 insertions(+), 26 deletions(-)

diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index c74bb94..839a2e6 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -17,11 +17,16 @@
 
 package org.apache.carbondata.mv.rewrite
 
-import org.apache.spark.sql.Row
+import scala.collection.JavaConverters._
+import java.util
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
@@ -535,5 +540,46 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
     }.getMessage.contains("Operation not allowed on child table.")
   }
 
+  test("drop meta cache on mv datamap table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm ")
+    sql("create datamap dm using 'mv' as select name, sum(price) from maintable group by name")
+    sql("select name, sum(price) from maintable group by name").collect()
+    val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+
+    sql("drop metacache on table maintable").show(false)
+
+    val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+    droppedCacheKeys.removeAll(cacheAfterDrop)
+
+    val tableIdentifier = new TableIdentifier("maintable", Some("default"))
+    val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
+    val dbPath = CarbonEnv
+      .getDatabaseLocation(tableIdentifier.database.get, sqlContext.sparkSession)
+    val tablePath = carbonTable.getTablePath
+    val mvPath = dbPath + CarbonCommonConstants.FILE_SEPARATOR + "dm_table" +
+                 CarbonCommonConstants.FILE_SEPARATOR
+
+    // Check if table index entries are dropped
+    assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))
+
+    // check if cache does not have any more table index entries
+    assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))
+
+    // Check if mv index entries are dropped
+    assert(droppedCacheKeys.asScala.exists(key => key.startsWith(mvPath)))
+
+    // check if cache does not have any more mv index entries
+    assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(mvPath)))
+  }
+
+  def clone(oldSet: util.Set[String]): util.HashSet[String] = {
+    val newSet = new util.HashSet[String]
+    newSet.addAll(oldSet)
+    newSet
+  }
+
 }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 93a6010..094d298 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -191,7 +191,7 @@ object CarbonEnv {
       .addListener(classOf[DeleteFromTablePostEvent], LoadPostDataMapListener )
       .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
       .addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
-      .addListener(classOf[DropTableCacheEvent], DropCachePreAggEventListener)
+      .addListener(classOf[DropTableCacheEvent], DropCacheDataMapEventListener)
       .addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
       .addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener)
       .addListener(classOf[ShowTableCacheEvent], ShowCacheDataMapEventListener)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
index 61d546e..e695f3a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
@@ -20,20 +20,19 @@ package org.apache.spark.sql.execution.command.cache
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener.LOGGER
+import org.apache.spark.util.DataMapUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
-import org.apache.carbondata.events.{DropTableCacheEvent, Event, OperationContext,
-  OperationEventListener}
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.events.{DropTableCacheEvent, Event, OperationContext, OperationEventListener}
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 
-object DropCachePreAggEventListener extends OperationEventListener {
+object DropCacheDataMapEventListener extends OperationEventListener {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -56,25 +55,37 @@ object DropCachePreAggEventListener extends OperationEventListener {
         if (carbonTable.hasDataMapSchema) {
           val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
             .filter(_.getRelationIdentifier != null)
-          for (childSchema <- childrenSchemas) {
-            val childTable =
-              CarbonEnv.getCarbonTable(
-                TableIdentifier(childSchema.getRelationIdentifier.getTableName,
-                  Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
-            try {
-              val dropCacheCommandForChildTable =
-                CarbonDropCacheCommand(
-                  TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
-                  internalCall = true)
-              dropCacheCommandForChildTable.processMetadata(sparkSession)
-            }
-            catch {
-              case e: Exception =>
-                LOGGER.warn(
-                  s"Clean cache for PreAgg table ${ childTable.getTableName } failed.", e)
-            }
-          }
+          dropCacheForChildTables(sparkSession, childrenSchemas)
         }
+        if (DataMapUtil.hasMVDataMap(carbonTable)) {
+          val childrenSchemas = DataMapStoreManager.getInstance
+            .getDataMapSchemasOfTable(carbonTable).asScala
+            .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+                                     !dataMapSchema.isIndexDataMap)
+          dropCacheForChildTables(sparkSession, childrenSchemas)
+        }
+    }
+  }
+
+  private def dropCacheForChildTables(sparkSession: SparkSession,
+      childrenSchemas: mutable.Buffer[DataMapSchema]): Unit = {
+    for (childSchema <- childrenSchemas) {
+      val childTable =
+        CarbonEnv.getCarbonTable(
+          TableIdentifier(childSchema.getRelationIdentifier.getTableName,
+            Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
+      try {
+        val dropCacheCommandForChildTable =
+          CarbonDropCacheCommand(
+            TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
+            internalCall = true)
+        dropCacheCommandForChildTable.processMetadata(sparkSession)
+      }
+      catch {
+        case e: Exception =>
+          LOGGER.warn(
+            s"Clean cache for child table ${ childTable.getTableName } failed.", e)
+      }
     }
   }
 }