You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/24 08:47:48 UTC
[carbondata] branch master updated: [CARBONDATA-3398]Fix drop
metacache on table having mv datamap
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new bdb50ed [CARBONDATA-3398]Fix drop metacache on table having mv datamap
bdb50ed is described below
commit bdb50ed781ea067dad9c41060cfbf06647bdc35c
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Mon Jun 10 12:38:16 2019 +0530
[CARBONDATA-3398]Fix drop metacache on table having mv datamap
Fixed drop metacache on table having mv datamap
This closes #3274
---
.../mv/rewrite/TestAllOperationsOnMV.scala | 48 +++++++++++++++++-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 2 +-
.../command/cache/DropCacheEventListeners.scala | 59 +++++++++++++---------
3 files changed, 83 insertions(+), 26 deletions(-)
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index c74bb94..839a2e6 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -17,11 +17,16 @@
package org.apache.carbondata.mv.rewrite
-import org.apache.spark.sql.Row
+import scala.collection.JavaConverters._
+import java.util
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterEach
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.spark.exception.ProcessMetaDataException
@@ -535,5 +540,46 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
}.getMessage.contains("Operation not allowed on child table.")
}
+ test("drop meta cache on mv datamap table") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop datamap if exists dm ")
+ sql("create datamap dm using 'mv' as select name, sum(price) from maintable group by name")
+ sql("select name, sum(price) from maintable group by name").collect()
+ val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+
+ sql("drop metacache on table maintable").show(false)
+
+ val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+ droppedCacheKeys.removeAll(cacheAfterDrop)
+
+ val tableIdentifier = new TableIdentifier("maintable", Some("default"))
+ val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
+ val dbPath = CarbonEnv
+ .getDatabaseLocation(tableIdentifier.database.get, sqlContext.sparkSession)
+ val tablePath = carbonTable.getTablePath
+ val mvPath = dbPath + CarbonCommonConstants.FILE_SEPARATOR + "dm_table" +
+ CarbonCommonConstants.FILE_SEPARATOR
+
+ // Check if table index entries are dropped
+ assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))
+
+ // check if cache does not have any more table index entries
+ assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))
+
+ // Check if mv index entries are dropped
+ assert(droppedCacheKeys.asScala.exists(key => key.startsWith(mvPath)))
+
+ // check if cache does not have any more mv index entries
+ assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(mvPath)))
+ }
+
+ def clone(oldSet: util.Set[String]): util.HashSet[String] = {
+ val newSet = new util.HashSet[String]
+ newSet.addAll(oldSet)
+ newSet
+ }
+
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 93a6010..094d298 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -191,7 +191,7 @@ object CarbonEnv {
.addListener(classOf[DeleteFromTablePostEvent], LoadPostDataMapListener )
.addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
.addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
- .addListener(classOf[DropTableCacheEvent], DropCachePreAggEventListener)
+ .addListener(classOf[DropTableCacheEvent], DropCacheDataMapEventListener)
.addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCacheDataMapEventListener)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
index 61d546e..e695f3a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
@@ -20,20 +20,19 @@ package org.apache.spark.sql.execution.command.cache
import scala.collection.JavaConverters._
import scala.collection.mutable
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener.LOGGER
+import org.apache.spark.util.DataMapUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
-import org.apache.carbondata.events.{DropTableCacheEvent, Event, OperationContext,
- OperationEventListener}
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.events.{DropTableCacheEvent, Event, OperationContext, OperationEventListener}
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
-object DropCachePreAggEventListener extends OperationEventListener {
+object DropCacheDataMapEventListener extends OperationEventListener {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -56,25 +55,37 @@ object DropCachePreAggEventListener extends OperationEventListener {
if (carbonTable.hasDataMapSchema) {
val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
.filter(_.getRelationIdentifier != null)
- for (childSchema <- childrenSchemas) {
- val childTable =
- CarbonEnv.getCarbonTable(
- TableIdentifier(childSchema.getRelationIdentifier.getTableName,
- Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
- try {
- val dropCacheCommandForChildTable =
- CarbonDropCacheCommand(
- TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
- internalCall = true)
- dropCacheCommandForChildTable.processMetadata(sparkSession)
- }
- catch {
- case e: Exception =>
- LOGGER.warn(
- s"Clean cache for PreAgg table ${ childTable.getTableName } failed.", e)
- }
- }
+ dropCacheForChildTables(sparkSession, childrenSchemas)
}
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
+ val childrenSchemas = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+ !dataMapSchema.isIndexDataMap)
+ dropCacheForChildTables(sparkSession, childrenSchemas)
+ }
+ }
+ }
+
+ private def dropCacheForChildTables(sparkSession: SparkSession,
+ childrenSchemas: mutable.Buffer[DataMapSchema]): Unit = {
+ for (childSchema <- childrenSchemas) {
+ val childTable =
+ CarbonEnv.getCarbonTable(
+ TableIdentifier(childSchema.getRelationIdentifier.getTableName,
+ Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
+ try {
+ val dropCacheCommandForChildTable =
+ CarbonDropCacheCommand(
+ TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
+ internalCall = true)
+ dropCacheCommandForChildTable.processMetadata(sparkSession)
+ }
+ catch {
+ case e: Exception =>
+ LOGGER.warn(
+ s"Clean cache for child table ${ childTable.getTableName } failed.", e)
+ }
}
}
}