You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/03/07 05:05:08 UTC

[carbondata] branch master updated: [CARBONDATA-3305] Added DDL to drop cache for a table

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f98c51  [CARBONDATA-3305] Added DDL to drop cache for a table
3f98c51 is described below

commit 3f98c51237e72987bf5faef4ba3798a57f027f5d
Author: namanrastogi <na...@gmail.com>
AuthorDate: Wed Feb 27 19:45:18 2019 +0530

    [CARBONDATA-3305] Added DDL to drop cache for a table
    
    Added CarbonDropCacheCommand to drop all cache entries for a particular table.
    
    usage: DROP METACACHE ON TABLE tableName
    Dropping cache for child table is not supported, the table has to be parent table.
    Running the above command will clear all the entries belonging to the table,
    its index entries, its datamap entries and it's forward and reverse dictionary.
    
    This closes #3138
---
 .../carbondata/core/cache/CarbonLRUCache.java      |  23 ++-
 docs/ddl-of-carbondata.md                          |  13 +-
 .../sql/commands/TestCarbonDropCacheCommand.scala  | 200 +++++++++++++++++++++
 .../sql/commands/TestCarbonShowCacheCommand.scala  |   2 +-
 .../apache/carbondata/events/DropCacheEvents.scala |  28 +++
 .../org/apache/carbondata/events/Events.scala      |   7 +
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |   2 +
 .../command/cache/CarbonDropCacheCommand.scala     | 103 +++++++++++
 .../command/cache/CarbonShowCacheCommand.scala     |  56 +++---
 .../cache/DropCachePreAggEventListener.scala       |  70 ++++++++
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |  10 +-
 11 files changed, 473 insertions(+), 41 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
index 74ff8a0..0c75173 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
@@ -62,10 +62,13 @@ public final class CarbonLRUCache {
    */
   public CarbonLRUCache(String propertyName, String defaultPropertyName) {
     try {
-      lruCacheMemorySize = Integer
-          .parseInt(CarbonProperties.getInstance().getProperty(propertyName, defaultPropertyName));
+      lruCacheMemorySize = Long
+          .parseLong(CarbonProperties.getInstance().getProperty(propertyName, defaultPropertyName));
     } catch (NumberFormatException e) {
-      lruCacheMemorySize = Integer.parseInt(defaultPropertyName);
+      LOGGER.error(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE
+          + " is not in a valid format. Falling back to default value: "
+          + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT);
+      lruCacheMemorySize = Long.parseLong(defaultPropertyName);
     }
     initCache();
     if (lruCacheMemorySize > 0) {
@@ -149,6 +152,17 @@ public final class CarbonLRUCache {
   }
 
   /**
+   * @param keys
+   */
+  public void removeAll(List<String> keys) {
+    synchronized (lruCacheMap) {
+      for (String key : keys) {
+        removeKey(key);
+      }
+    }
+  }
+
+  /**
    * This method will remove the key from lru cache
    *
    * @param key
@@ -302,6 +316,9 @@ public final class CarbonLRUCache {
    */
   public void clear() {
     synchronized (lruCacheMap) {
+      for (Cacheable cachebleObj : lruCacheMap.values()) {
+        cachebleObj.invalidate();
+      }
       lruCacheMap.clear();
     }
   }
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 3476475..e6f209e 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -1095,7 +1095,7 @@ Users can specify which columns to include and exclude for local dictionary gene
   about current cache used status in memory through the following command:
 
   ```sql
-  SHOW METADATA
+  SHOW METACACHE
   ``` 
   
   This shows the overall memory consumed in the cache by categories - index files, dictionary and 
@@ -1103,10 +1103,19 @@ Users can specify which columns to include and exclude for local dictionary gene
   database.
   
   ```sql
-  SHOW METADATA ON TABLE tableName
+  SHOW METACACHE ON TABLE tableName
   ```
   
   This shows detailed information on cache usage by the table `tableName` and its carbonindex files, 
   its dictionary files, its datamaps and children tables.
   
   This command is not allowed on child tables.
+
+  ```sql
+    DROP METACACHE ON TABLE tableName
+   ```
+    
+  This clears any entry in cache by the table `tableName`, its carbonindex files, 
+  its dictionary files, its datamaps and children tables.
+    
+  This command is not allowed on child tables.
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonDropCacheCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonDropCacheCommand.scala
new file mode 100644
index 0000000..982ec76
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonDropCacheCommand.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sql.commands
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+class TestCarbonDropCacheCommand extends QueryTest with BeforeAndAfterAll {
+
+  val dbName = "cache_db"
+
+  override protected def beforeAll(): Unit = {
+    sql(s"DROP DATABASE IF EXISTS $dbName CASCADE")
+    sql(s"CREATE DATABASE $dbName")
+    sql(s"USE $dbName")
+  }
+
+  override protected def afterAll(): Unit = {
+    sql(s"use default")
+    sql(s"DROP DATABASE $dbName CASCADE")
+  }
+
+
+  test("Test dictionary") {
+    val tableName = "t1"
+
+    sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
+        s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
+        s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
+        s"attendance int,utilization int, salary int) stored by 'carbondata' " +
+        s"TBLPROPERTIES('DICTIONARY_INCLUDE'='designation, workgroupcategoryname')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
+    sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
+    sql(s"SELECT * FROM $tableName").collect()
+
+    val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+
+    sql(s"DROP METACACHE ON TABLE $tableName")
+
+    val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+    droppedCacheKeys.removeAll(cacheAfterDrop)
+
+    val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
+    val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
+    val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
+    val dictIds = carbonTable.getAllDimensions.asScala.filter(_.isGlobalDictionaryEncoding)
+      .map(_.getColumnId).toArray
+
+    // Check if table index entries are dropped
+    assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))
+
+    // check if cache does not have any more table index entries
+    assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))
+
+    // check if table dictionary entries are dropped
+    for (dictId <- dictIds) {
+      assert(droppedCacheKeys.asScala.exists(key => key.contains(dictId)))
+    }
+
+    // check if cache does not have any more table dictionary entries
+    for (dictId <- dictIds) {
+      assert(!cacheAfterDrop.asScala.exists(key => key.contains(dictId)))
+    }
+  }
+
+
+  test("Test preaggregate datamap") {
+    val tableName = "t2"
+
+    sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
+        s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
+        s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
+        s"attendance int, utilization int, salary int) stored by 'carbondata'")
+    sql(s"CREATE DATAMAP dpagg ON TABLE $tableName USING 'preaggregate' AS " +
+        s"SELECT AVG(salary), workgroupcategoryname from $tableName GROUP BY workgroupcategoryname")
+    sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
+    sql(s"SELECT * FROM $tableName").collect()
+    sql(s"SELECT AVG(salary), workgroupcategoryname from $tableName " +
+        s"GROUP BY workgroupcategoryname").collect()
+    val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+
+    sql(s"DROP METACACHE ON TABLE $tableName")
+
+    val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+    droppedCacheKeys.removeAll(cacheAfterDrop)
+
+    val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
+    val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
+    val dbPath = CarbonEnv
+      .getDatabaseLocation(tableIdentifier.database.get, sqlContext.sparkSession)
+    val tablePath = carbonTable.getTablePath
+    val preaggPath = dbPath + CarbonCommonConstants.FILE_SEPARATOR + carbonTable.getTableName +
+                     "_" + carbonTable.getTableInfo.getDataMapSchemaList.get(0).getDataMapName +
+                     CarbonCommonConstants.FILE_SEPARATOR
+
+    // Check if table index entries are dropped
+    assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))
+
+    // check if cache does not have any more table index entries
+    assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))
+
+    // Check if preaggregate index entries are dropped
+    assert(droppedCacheKeys.asScala.exists(key => key.startsWith(preaggPath)))
+
+    // check if cache does not have any more preaggregate index entries
+    assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(preaggPath)))
+  }
+
+
+  test("Test bloom filter") {
+    val tableName = "t3"
+
+    sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
+        s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
+        s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
+        s"attendance int, utilization int, salary int) stored by 'carbondata'")
+    sql(s"CREATE DATAMAP dblom ON TABLE $tableName USING 'bloomfilter' " +
+        "DMPROPERTIES('INDEX_COLUMNS'='deptno')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
+    sql(s"SELECT * FROM $tableName").collect()
+    sql(s"SELECT * FROM $tableName WHERE deptno=10").collect()
+
+    val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+
+    sql(s"DROP METACACHE ON TABLE $tableName")
+
+    val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
+    droppedCacheKeys.removeAll(cacheAfterDrop)
+
+    val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
+    val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
+    val tablePath = carbonTable.getTablePath
+    val bloomPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + "dblom" +
+                    CarbonCommonConstants.FILE_SEPARATOR
+
+    // Check if table index entries are dropped
+    assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))
+
+    // check if cache does not have any more table index entries
+    assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))
+
+    // Check if bloom entries are dropped
+    assert(droppedCacheKeys.asScala.exists(key => key.contains(bloomPath)))
+
+    // check if cache does not have any more bloom entries
+    assert(!cacheAfterDrop.asScala.exists(key => key.contains(bloomPath)))
+  }
+
+
+  test("Test preaggregate datamap fail") {
+    val tableName = "t4"
+
+    sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
+        s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
+        s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
+        s"attendance int, utilization int, salary int) stored by 'carbondata'")
+    sql(s"CREATE DATAMAP dpagg ON TABLE $tableName USING 'preaggregate' AS " +
+        s"SELECT AVG(salary), workgroupcategoryname from $tableName GROUP BY workgroupcategoryname")
+    sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
+    sql(s"SELECT * FROM $tableName").collect()
+    sql(s"SELECT AVG(salary), workgroupcategoryname from $tableName " +
+        s"GROUP BY workgroupcategoryname").collect()
+
+    val fail_message = intercept[UnsupportedOperationException] {
+      sql(s"DROP METACACHE ON TABLE ${tableName}_dpagg")
+    }.getMessage
+    assert(fail_message.contains("Operation not allowed on child table."))
+  }
+
+
+  def clone(oldSet: util.Set[String]): util.HashSet[String] = {
+    val newSet = new util.HashSet[String]
+    newSet.addAll(oldSet)
+    newSet
+  }
+}
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
index 0e1cd00..e999fc7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
@@ -128,7 +128,7 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
     sql("use cache_empty_db").collect()
     val result1 = sql("show metacache").collect()
     assertResult(2)(result1.length)
-    assertResult(Row("cache_empty_db", "ALL", "0", "0", "0"))(result1(1))
+    assertResult(Row("cache_empty_db", "ALL", "0 bytes", "0 bytes", "0 bytes"))(result1(1))
 
     sql("use cache_db").collect()
     val result2 = sql("show metacache").collect()
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala
new file mode 100644
index 0000000..2e8b78e
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+case class DropCacheEvent(
+    carbonTable: CarbonTable,
+    sparkSession: SparkSession,
+    internalCall: Boolean)
+  extends Event with DropCacheEventInfo
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 1830a35..c03d3c6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -63,6 +63,13 @@ trait DropTableEventInfo {
 }
 
 /**
+ * event for drop cache
+ */
+trait DropCacheEventInfo {
+  val carbonTable: CarbonTable
+}
+
+/**
  * event for alter_table_drop_column
  */
 trait AlterTableDropColumnEventInfo {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index a7677d7..60d896a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
+import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
@@ -185,6 +186,7 @@ object CarbonEnv {
       .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
       .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
       .addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
+      .addListener(classOf[DropCacheEvent], DropCachePreAggEventListener)
   }
 
   /**
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
new file mode 100644
index 0000000..e955ed9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.cache
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.MetadataCommand
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue
+import org.apache.carbondata.events.{DropCacheEvent, OperationContext, OperationListenerBus}
+
+case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall: Boolean = false)
+  extends MetadataCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+    clearCache(carbonTable, sparkSession)
+    Seq.empty
+  }
+
+  def clearCache(carbonTable: CarbonTable, sparkSession: SparkSession): Unit = {
+    LOGGER.info("Drop cache request received for table " + carbonTable.getTableName)
+
+    val dropCacheEvent = DropCacheEvent(
+      carbonTable,
+      sparkSession,
+      internalCall
+    )
+    val operationContext = new OperationContext
+    OperationListenerBus.getInstance.fireEvent(dropCacheEvent, operationContext)
+
+    val cache = CacheProvider.getInstance().getCarbonCache
+    if (cache != null) {
+      val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
+
+      // Dictionary IDs
+      val dictIds = carbonTable.getAllDimensions.asScala.filter(_.isGlobalDictionaryEncoding)
+        .map(_.getColumnId).toArray
+
+      // Remove elements from cache
+      val keysToRemove = ListBuffer[String]()
+      val cacheIterator = cache.getCacheMap.entrySet().iterator()
+      while (cacheIterator.hasNext) {
+        val entry = cacheIterator.next()
+        val cache = entry.getValue
+
+        if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
+          // index
+          val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+          if (indexPath.startsWith(tablePath)) {
+            keysToRemove += entry.getKey
+          }
+        } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
+          // bloom datamap
+          val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+          if (shardPath.contains(tablePath)) {
+            keysToRemove += entry.getKey
+          }
+        } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
+          // dictionary
+          val dictId = dictIds.find(id => entry.getKey.startsWith(id))
+          if (dictId.isDefined) {
+            keysToRemove += entry.getKey
+          }
+        }
+      }
+      cache.removeAll(keysToRemove.asJava)
+    }
+
+    LOGGER.info("Drop cache request received for table " + carbonTable.getTableName)
+  }
+
+  override protected def opName: String = "DROP CACHE"
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index e937c32..e5f89d8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -66,29 +66,24 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
     val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
     val cache = CacheProvider.getInstance().getCarbonCache()
     if (cache == null) {
-      Seq(Row("ALL", "ALL", 0L, 0L, 0L),
-        Row(currentDatabase, "ALL", 0L, 0L, 0L))
+      Seq(
+        Row("ALL", "ALL", byteCountToDisplaySize(0L),
+          byteCountToDisplaySize(0L), byteCountToDisplaySize(0L)),
+        Row(currentDatabase, "ALL", byteCountToDisplaySize(0L),
+          byteCountToDisplaySize(0L), byteCountToDisplaySize(0L)))
     } else {
-      val tableIdents = sparkSession.sessionState.catalog.listTables(currentDatabase).toArray
-      val dbLocation = CarbonEnv.getDatabaseLocation(currentDatabase, sparkSession)
-      val tempLocation = dbLocation.replace(
-        CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
-      val tablePaths = tableIdents.map { tableIdent =>
-        (tempLocation + CarbonCommonConstants.FILE_SEPARATOR +
-         tableIdent.table + CarbonCommonConstants.FILE_SEPARATOR,
-          CarbonEnv.getDatabaseName(tableIdent.database)(sparkSession) + "." + tableIdent.table)
+      val carbonTables = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+        .listAllTables(sparkSession)
+        .filter { table =>
+        table.getDatabaseName.equalsIgnoreCase(currentDatabase)
+      }
+      val tablePaths = carbonTables
+        .map { table =>
+          (table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR,
+            table.getDatabaseName + "." + table.getTableName)
       }
 
-      val dictIds = tableIdents
-        .map { tableIdent =>
-          var table: CarbonTable = null
-          try {
-            table = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
-          } catch {
-            case _ =>
-          }
-          table
-        }
+      val dictIds = carbonTables
         .filter(_ != null)
         .flatMap { table =>
           table
@@ -159,7 +154,8 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
         Seq(
           Row("ALL", "ALL", byteCountToDisplaySize(allIndexSize),
             byteCountToDisplaySize(allDatamapSize), byteCountToDisplaySize(allDictSize)),
-          Row(currentDatabase, "ALL", "0", "0", "0"))
+          Row(currentDatabase, "ALL", byteCountToDisplaySize(0),
+            byteCountToDisplaySize(0), byteCountToDisplaySize(0)))
       } else {
         val tableList = tableMapIndexSize
           .map(_._1)
@@ -187,17 +183,11 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
   }
 
   def showTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
-    val tableName = carbonTable.getTableName
-    val databaseName = carbonTable.getDatabaseName
     val cache = CacheProvider.getInstance().getCarbonCache()
     if (cache == null) {
       Seq.empty
     } else {
-      val dbLocation = CarbonEnv
-        .getDatabaseLocation(databaseName, sparkSession)
-        .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
-      val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR +
-                      tableName + CarbonCommonConstants.FILE_SEPARATOR
+      val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
       var numIndexFilesCached = 0
 
       // Path -> Name, Type
@@ -209,8 +199,10 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
       datamapSize.put(tablePath, 0)
       // children tables
       for( schema <- carbonTable.getTableInfo.getDataMapSchemaList.asScala ) {
-        val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + "_" +
-                   schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR
+        val childTableName = carbonTable.getTableName + "_" + schema.getDataMapName
+        val childTable = CarbonEnv
+          .getCarbonTable(Some(carbonTable.getDatabaseName), childTableName)(sparkSession)
+        val path = childTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
         val name = schema.getDataMapName
         val dmType = schema.getProviderName
         datamapName.put(path, (name, dmType))
@@ -219,9 +211,7 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
       // index schemas
       for (schema <- DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
         .asScala) {
-        val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName +
-                   CarbonCommonConstants.FILE_SEPARATOR + schema.getDataMapName +
-                   CarbonCommonConstants.FILE_SEPARATOR
+        val path = tablePath + schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR
         val name = schema.getDataMapName
         val dmType = schema.getProviderName
         datamapName.put(path, (name, dmType))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala
new file mode 100644
index 0000000..3d03c60
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala
@@ -0,0 +1,70 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.command.cache
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.catalyst.TableIdentifier
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.events.{DropCacheEvent, Event, OperationContext,
+  OperationEventListener}
+
+object DropCachePreAggEventListener extends OperationEventListener {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override protected def onEvent(event: Event,
+      operationContext: OperationContext): Unit = {
+
+    event match {
+      case dropCacheEvent: DropCacheEvent =>
+        val carbonTable = dropCacheEvent.carbonTable
+        val sparkSession = dropCacheEvent.sparkSession
+        val internalCall = dropCacheEvent.internalCall
+        if (carbonTable.isChildDataMap && !internalCall) {
+          throw new UnsupportedOperationException("Operation not allowed on child table.")
+        }
+
+        if (carbonTable.hasDataMapSchema) {
+          val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
+            .filter(_.getRelationIdentifier != null)
+          for (childSchema <- childrenSchemas) {
+            val childTable =
+              CarbonEnv.getCarbonTable(
+                TableIdentifier(childSchema.getRelationIdentifier.getTableName,
+                  Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
+            val dropCacheCommandForChildTable =
+              CarbonDropCacheCommand(
+                TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
+                internalCall = true)
+            dropCacheCommandForChildTable.processMetadata(sparkSession)
+          }
+        }
+    }
+
+  }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index a2923b8..5f5cc12 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.execution.command.cache.CarbonShowCacheCommand
+import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, CarbonShowCacheCommand}
 import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand}
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
@@ -95,7 +95,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     createStream | dropStream | showStreams
 
   protected lazy val cacheManagement: Parser[LogicalPlan] =
-    showCache
+    showCache | dropCache
 
   protected lazy val alterAddPartition: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~>
@@ -503,6 +503,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         CarbonShowCacheCommand(table)
     }
 
+  protected lazy val dropCache: Parser[LogicalPlan] =
+    DROP ~> METACACHE ~> ontable <~ opt(";") ^^ {
+      case table =>
+        CarbonDropCacheCommand(table)
+    }
+
   protected lazy val cli: Parser[LogicalPlan] =
     (CARBONCLI ~> FOR ~> TABLE) ~> (ident <~ ".").? ~ ident ~
     (OPTIONS ~> "(" ~> commandOptions <~ ")").? <~