You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/02 02:41:46 UTC
[carbondata] 26/41: [CARBONDATA-3318] Added PreAgg & Bloom
Event-Listener for ShowCacheCommmand
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit c4f32dd3c127011f656e7caea82e5df7af7c1c2a
Author: namanrastogi <na...@gmail.com>
AuthorDate: Sat Mar 9 15:49:00 2019 +0530
[CARBONDATA-3318] Added PreAgg & Bloom Event-Listener for ShowCacheCommmand
Decoupling of Cache Commands
1. Added PreAgg Event-Listener for ShowCacheCommmand
2. Added Bloom Event-Listener for ShowCacheCommmand
3. Added PreAgg Event-Listener for DropCacheCommand
4. Added Bloom Event-Listener for DropCacheCommmand
5. Updated doc
6. Support external table
6.1 display external table in comments/with table name
6.2 count the index files for external table
This closes #3146
---
.../datamap/bloom/BloomCacheKeyValue.java | 2 +-
.../bloom/BloomCoarseGrainDataMapFactory.java | 9 +-
docs/ddl-of-carbondata.md | 13 +
.../sql/commands/TestCarbonShowCacheCommand.scala | 51 ++-
.../{DropCacheEvents.scala => CacheEvents.scala} | 11 +-
.../org/apache/carbondata/events/Events.scala | 9 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 7 +-
.../sql/execution/command/cache/CacheUtil.scala | 108 +++++++
.../command/cache/CarbonDropCacheCommand.scala | 59 +---
.../command/cache/CarbonShowCacheCommand.scala | 341 ++++++++-------------
.../command/cache/DropCacheEventListeners.scala | 121 ++++++++
.../cache/DropCachePreAggEventListener.scala | 70 -----
.../command/cache/ShowCacheEventListeners.scala | 126 ++++++++
13 files changed, 581 insertions(+), 346 deletions(-)
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
index a66ee63..70624eb 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
@@ -35,7 +35,7 @@ public class BloomCacheKeyValue {
private String shardPath;
private String indexColumn;
- CacheKey(String shardPath, String indexColumn) {
+ public CacheKey(String shardPath, String indexColumn) {
this.shardPath = shardPath;
this.indexColumn = indexColumn;
}
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 9785549..11b216e 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -227,7 +227,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
* returns all shard directories of bloom index files for query
* if bloom index files are merged we should get only one shard path
*/
- private Set<String> getAllShardPaths(String tablePath, String segmentId) {
+ public static Set<String> getAllShardPaths(String tablePath, String segmentId,
+ String dataMapName) {
String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
tablePath, segmentId, dataMapName);
CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
@@ -257,7 +258,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
try {
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
- shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
+ shardPaths =
+ getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
Set<String> filteredShards = segment.getFilteredIndexShardNames();
@@ -299,7 +301,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
- shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
+ shardPaths =
+ getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
Set<String> filteredShards = segment.getFilteredIndexShardNames();
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index e6f209e..07a2670 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -1119,3 +1119,16 @@ Users can specify which columns to include and exclude for local dictionary gene
its dictionary files, its datamaps and children tables.
This command is not allowed on child tables.
+
+### Important points
+
+ 1. Cache information is updated only after the select query is executed.
+
+ 2. In case of alter table the already loaded cache is invalidated when any subsequent select query
+ is fired.
+
+ 3. Dictionary is loaded in cache only when the dictionary columns are queried upon. If we don't do
+ direct query on dictionary column, cache will not be loaded.
+ If we do `SELECT * FROM t1`, and even though for this case dictionary is loaded, it is loaded in
+ executor and not on driver, and the final result rows are returned back to driver, and thus will
+ produce no trace on driver cache if we do `SHOW METACACHE` or `SHOW METACACHE ON TABLE t1`.
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
index e7fd5fa..35ac2e3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
@@ -17,10 +17,14 @@
package org.apache.carbondata.sql.commands
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
override protected def beforeAll(): Unit = {
// use new database
@@ -133,6 +137,28 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
}
+ test("test external table show cache") {
+ sql(s"CREATE TABLE employeeTable(empno int, empname String, designation String, " +
+ s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
+ s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
+ s"attendance int, utilization int, salary int) stored by 'carbondata'")
+ sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE employeeTable")
+ val table = CarbonEnv.getCarbonTable(Some("default"), "employeeTable")(sqlContext.sparkSession)
+ val location = FileFactory
+ .getUpdatedFilePath(
+ table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "/Fact/Part0/Segment_0")
+ sql(s"CREATE EXTERNAL TABLE extTable stored as carbondata LOCATION '${location}'")
+ sql("select * from extTable").show()
+ val rows = sql("SHOW METACACHE ON TABLE extTable").collect()
+ var isPresent = false
+ rows.foreach(row => {
+ if (row.getString(2).equalsIgnoreCase("1/1 index files cached (external table)")){
+ isPresent = true
+ }
+ })
+ Assert.assertTrue(isPresent)
+ }
+
override protected def afterAll(): Unit = {
sql("use default").collect()
dropTable
@@ -145,42 +171,63 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS default.cache_4")
sql("DROP TABLE IF EXISTS default.cache_5")
sql("DROP TABLE IF EXISTS empTable")
+ sql("DROP TABLE IF EXISTS employeeTable")
+ sql("DROP TABLE IF EXISTS extTable")
}
test("show cache") {
+
+ // Empty database
sql("use cache_empty_db").collect()
val result1 = sql("show metacache").collect()
assertResult(2)(result1.length)
assertResult(Row("cache_empty_db", "ALL", "0 B", "0 B", "0 B"))(result1(1))
+ // Database with 3 tables but only 2 are in cache
sql("use cache_db").collect()
val result2 = sql("show metacache").collect()
assertResult(4)(result2.length)
+ // Make sure PreAgg tables are not in SHOW METADATA
sql("use default").collect()
val result3 = sql("show metacache").collect()
val dataMapCacheInfo = result3
.map(row => row.getString(1))
.filter(table => table.equals("cache_4_cache_4_count"))
- assertResult(1)(dataMapCacheInfo.length)
+ assertResult(0)(dataMapCacheInfo.length)
}
test("show metacache on table") {
sql("use cache_db").collect()
+
+ // Table with Index, Dictionary & Bloom filter
val result1 = sql("show metacache on table cache_1").collect()
assertResult(3)(result1.length)
+ assertResult("1/1 index files cached")(result1(0).getString(2))
+ assertResult("bloomfilter")(result1(2).getString(2))
+ // Table with Index and Dictionary
val result2 = sql("show metacache on table cache_db.cache_2").collect()
assertResult(2)(result2.length)
+ assertResult("2/2 index files cached")(result2(0).getString(2))
+ assertResult("0 B")(result2(1).getString(1))
+ // Table not in cache
checkAnswer(sql("show metacache on table cache_db.cache_3"),
Seq(Row("Index", "0 B", "0/1 index files cached"), Row("Dictionary", "0 B", "")))
+ // Table with Index, Dictionary & PreAgg child table
val result4 = sql("show metacache on table default.cache_4").collect()
assertResult(3)(result4.length)
+ assertResult("1/1 index files cached")(result4(0).getString(2))
+ assertResult("0 B")(result4(1).getString(1))
+ assertResult("preaggregate")(result4(2).getString(2))
sql("use default").collect()
+
+ // Table with 5 index files
val result5 = sql("show metacache on table cache_5").collect()
assertResult(2)(result5.length)
+ assertResult("5/5 index files cached")(result5(0).getString(2))
}
}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CacheEvents.scala
similarity index 80%
rename from integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala
rename to integration/spark-common/src/main/scala/org/apache/carbondata/events/CacheEvents.scala
index 2e8b78e..ec5127f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CacheEvents.scala
@@ -21,8 +21,15 @@ import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-case class DropCacheEvent(
+case class DropTableCacheEvent(
carbonTable: CarbonTable,
sparkSession: SparkSession,
internalCall: Boolean)
- extends Event with DropCacheEventInfo
+ extends Event with DropTableCacheEventInfo
+
+
+case class ShowTableCacheEvent(
+ carbonTable: CarbonTable,
+ sparkSession: SparkSession,
+ internalCall: Boolean)
+ extends Event with ShowTableCacheEventInfo
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index c03d3c6..e6b9213 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -63,9 +63,16 @@ trait DropTableEventInfo {
}
/**
+ * event for show cache
+ */
+trait ShowTableCacheEventInfo {
+ val carbonTable: CarbonTable
+}
+
+/**
* event for drop cache
*/
-trait DropCacheEventInfo {
+trait DropTableCacheEventInfo {
val carbonTable: CarbonTable
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 60d896a..7ca9945 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
-import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener
+import org.apache.spark.sql.execution.command.cache._
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive._
@@ -186,7 +186,10 @@ object CarbonEnv {
.addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
.addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
.addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
- .addListener(classOf[DropCacheEvent], DropCachePreAggEventListener)
+ .addListener(classOf[DropTableCacheEvent], DropCachePreAggEventListener)
+ .addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
+ .addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener)
+ .addListener(classOf[ShowTableCacheEvent], ShowCacheBloomEventListener)
}
/**
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
new file mode 100644
index 0000000..615d8e0
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.cache
+
+import org.apache.hadoop.mapred.JobConf
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.core.cache.CacheType
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
+import org.apache.carbondata.datamap.bloom.{BloomCacheKeyValue, BloomCoarseGrainDataMapFactory}
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+
+
+object CacheUtil {
+
+ /**
+ * Given a carbonTable, returns the list of all carbonindex files
+ *
+ * @param carbonTable
+ * @return List of all index files
+ */
+ def getAllIndexFiles(carbonTable: CarbonTable): List[String] = {
+ if (carbonTable.isTransactionalTable) {
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ CarbonDataMergerUtil.getValidSegmentList(absoluteTableIdentifier).asScala.flatMap {
+ segment =>
+ segment.getCommittedIndexFile.keySet().asScala
+ }.toList
+ } else {
+ val tablePath = carbonTable.getTablePath
+ val readCommittedScope = new LatestFilesReadCommittedScope(tablePath,
+ FileFactory.getConfiguration)
+ readCommittedScope.getSegmentList.flatMap {
+ load =>
+ val seg = new Segment(load.getLoadName, null, readCommittedScope)
+ seg.getCommittedIndexFile.keySet().asScala
+ }.toList
+ }
+ }
+
+ /**
+ * Given a carbonTable file, returns a list of all dictionary entries which can be in cache
+ *
+ * @param carbonTable
+ * @return List of all dict entries which can in cache
+ */
+ def getAllDictCacheKeys(carbonTable: CarbonTable): List[String] = {
+ def getDictCacheKey(columnIdentifier: String,
+ cacheType: CacheType[_, _]): String = {
+ columnIdentifier + CarbonCommonConstants.UNDERSCORE + cacheType.getCacheName
+ }
+
+ carbonTable.getAllDimensions.asScala
+ .collect {
+ case dict if dict.isGlobalDictionaryEncoding =>
+ Seq(getDictCacheKey(dict.getColumnId, CacheType.FORWARD_DICTIONARY),
+ getDictCacheKey(dict.getColumnId, CacheType.REVERSE_DICTIONARY))
+ }.flatten.toList
+ }
+
+ def getBloomCacheKeys(carbonTable: CarbonTable, datamap: DataMapSchema): List[String] = {
+ val segments = CarbonDataMergerUtil
+ .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala
+
+ // Generate shard Path for the datamap
+ val shardPaths = segments.flatMap {
+ segment =>
+ BloomCoarseGrainDataMapFactory.getAllShardPaths(carbonTable.getTablePath,
+ segment.getSegmentNo, datamap.getDataMapName).asScala
+ }
+
+ // get index columns
+ val indexColumns = carbonTable.getIndexedColumns(datamap).asScala.map {
+ entry =>
+ entry.getColName
+ }
+
+ // generate cache key using shard path and index columns on which bloom was created.
+ val datamapKeys = shardPaths.flatMap {
+ shardPath =>
+ indexColumns.map {
+ indexCol =>
+ new BloomCacheKeyValue.CacheKey(shardPath, indexCol).toString
+ }
+ }
+ datamapKeys.toList
+ }
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
index e955ed9..a0bb43e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.command.cache
import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -26,12 +25,8 @@ import org.apache.spark.sql.execution.command.MetadataCommand
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue
-import org.apache.carbondata.events.{DropCacheEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{DropTableCacheEvent, OperationContext, OperationListenerBus}
case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall: Boolean = false)
extends MetadataCommand {
@@ -45,59 +40,27 @@ case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall
}
def clearCache(carbonTable: CarbonTable, sparkSession: SparkSession): Unit = {
- LOGGER.info("Drop cache request received for table " + carbonTable.getTableName)
+ LOGGER.info("Drop cache request received for table " + carbonTable.getTableUniqueName)
- val dropCacheEvent = DropCacheEvent(
- carbonTable,
- sparkSession,
- internalCall
- )
+ val dropCacheEvent = DropTableCacheEvent(carbonTable, sparkSession, internalCall)
val operationContext = new OperationContext
OperationListenerBus.getInstance.fireEvent(dropCacheEvent, operationContext)
val cache = CacheProvider.getInstance().getCarbonCache
if (cache != null) {
- val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
- // Dictionary IDs
- val dictIds = carbonTable.getAllDimensions.asScala.filter(_.isGlobalDictionaryEncoding)
- .map(_.getColumnId).toArray
+ // Get all Index files for the specified table.
+ val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)
- // Remove elements from cache
- val keysToRemove = ListBuffer[String]()
- val cacheIterator = cache.getCacheMap.entrySet().iterator()
- while (cacheIterator.hasNext) {
- val entry = cacheIterator.next()
- val cache = entry.getValue
+ // Extract dictionary keys for the table and create cache keys from those
+ val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
- if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
- // index
- val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
- CarbonCommonConstants.FILE_SEPARATOR)
- if (indexPath.startsWith(tablePath)) {
- keysToRemove += entry.getKey
- }
- } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
- // bloom datamap
- val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
- CarbonCommonConstants.FILE_SEPARATOR)
- if (shardPath.contains(tablePath)) {
- keysToRemove += entry.getKey
- }
- } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
- // dictionary
- val dictId = dictIds.find(id => entry.getKey.startsWith(id))
- if (dictId.isDefined) {
- keysToRemove += entry.getKey
- }
- }
- }
+ // Remove elements from cache
+ val keysToRemove = allIndexFiles ++ dictKeys
cache.removeAll(keysToRemove.asJava)
}
-
- LOGGER.info("Drop cache request received for table " + carbonTable.getTableName)
+ LOGGER.info("Drop cache request served for table " + carbonTable.getTableUniqueName)
}
- override protected def opName: String = "DROP CACHE"
-
+ override protected def opName: String = "DROP METACACHE"
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index 462be83..e19ee48 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -20,27 +20,28 @@ package org.apache.spark.sql.execution.command.cache
import scala.collection.mutable
import scala.collection.JavaConverters._
+import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.command.MetadataCommand
import org.apache.spark.sql.types.StringType
-import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.cache.{CacheProvider, CacheType}
import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, ShowTableCacheEvent}
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
import org.apache.carbondata.spark.util.CommonUtil.bytesToDisplaySize
-/**
- * SHOW CACHE
- */
-case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
+
+case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
+ internalCall: Boolean = false)
extends MetadataCommand {
override def output: Seq[AttributeReference] = {
@@ -61,241 +62,147 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
override protected def opName: String = "SHOW CACHE"
- def showAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
+ def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
val cache = CacheProvider.getInstance().getCarbonCache()
if (cache == null) {
Seq(
- Row("ALL", "ALL", bytesToDisplaySize(0L),
- bytesToDisplaySize(0L), bytesToDisplaySize(0L)),
- Row(currentDatabase, "ALL", bytesToDisplaySize(0L),
- bytesToDisplaySize(0L), bytesToDisplaySize(0L)))
+ Row("ALL", "ALL", 0L, 0L, 0L),
+ Row(currentDatabase, "ALL", 0L, 0L, 0L))
} else {
val carbonTables = CarbonEnv.getInstance(sparkSession).carbonMetaStore
- .listAllTables(sparkSession)
- .filter { table =>
- table.getDatabaseName.equalsIgnoreCase(currentDatabase)
- }
- val tablePaths = carbonTables
- .map { table =>
- (table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR,
- table.getDatabaseName + "." + table.getTableName)
+ .listAllTables(sparkSession).filter {
+ carbonTable =>
+ carbonTable.getDatabaseName.equalsIgnoreCase(currentDatabase) &&
+ !carbonTable.isChildDataMap
}
- val dictIds = carbonTables
- .filter(_ != null)
- .flatMap { table =>
- table
- .getAllDimensions
- .asScala
- .filter(_.isGlobalDictionaryEncoding)
- .toArray
- .map(dim => (dim.getColumnId, table.getDatabaseName + "." + table.getTableName))
- }
-
- // all databases
- var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
- // current database
+ // All tables of current database
var (dbIndexSize, dbDatamapSize, dbDictSize) = (0L, 0L, 0L)
- val tableMapIndexSize = mutable.HashMap[String, Long]()
- val tableMapDatamapSize = mutable.HashMap[String, Long]()
- val tableMapDictSize = mutable.HashMap[String, Long]()
- val cacheIterator = cache.getCacheMap.entrySet().iterator()
- while (cacheIterator.hasNext) {
- val entry = cacheIterator.next()
- val cache = entry.getValue
- if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
- // index
- allIndexSize = allIndexSize + cache.getMemorySize
- val indexPath = entry.getKey.replace(
- CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
- val tablePath = tablePaths.find(path => indexPath.startsWith(path._1))
- if (tablePath.isDefined) {
- dbIndexSize = dbIndexSize + cache.getMemorySize
- val memorySize = tableMapIndexSize.get(tablePath.get._2)
- if (memorySize.isEmpty) {
- tableMapIndexSize.put(tablePath.get._2, cache.getMemorySize)
- } else {
- tableMapIndexSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
- }
- }
- } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
- // bloom datamap
- allDatamapSize = allDatamapSize + cache.getMemorySize
- val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
- CarbonCommonConstants.FILE_SEPARATOR)
- val tablePath = tablePaths.find(path => shardPath.contains(path._1))
- if (tablePath.isDefined) {
- dbDatamapSize = dbDatamapSize + cache.getMemorySize
- val memorySize = tableMapDatamapSize.get(tablePath.get._2)
- if (memorySize.isEmpty) {
- tableMapDatamapSize.put(tablePath.get._2, cache.getMemorySize)
- } else {
- tableMapDatamapSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
- }
- }
- } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
- // dictionary
- allDictSize = allDictSize + cache.getMemorySize
- val dictId = dictIds.find(id => entry.getKey.startsWith(id._1))
- if (dictId.isDefined) {
- dbDictSize = dbDictSize + cache.getMemorySize
- val memorySize = tableMapDictSize.get(dictId.get._2)
- if (memorySize.isEmpty) {
- tableMapDictSize.put(dictId.get._2, cache.getMemorySize)
- } else {
- tableMapDictSize.put(dictId.get._2, memorySize.get + cache.getMemorySize)
- }
- }
- }
- }
- if (tableMapIndexSize.isEmpty && tableMapDatamapSize.isEmpty && tableMapDictSize.isEmpty) {
- Seq(
- Row("ALL", "ALL", bytesToDisplaySize(allIndexSize),
- bytesToDisplaySize(allDatamapSize), bytesToDisplaySize(allDictSize)),
- Row(currentDatabase, "ALL", bytesToDisplaySize(0),
- bytesToDisplaySize(0), bytesToDisplaySize(0)))
- } else {
- val tableList = tableMapIndexSize
- .map(_._1)
- .toSeq
- .union(tableMapDictSize.map(_._1).toSeq)
- .distinct
- .sorted
- .map { uniqueName =>
- val values = uniqueName.split("\\.")
- val indexSize = tableMapIndexSize.getOrElse(uniqueName, 0L)
- val datamapSize = tableMapDatamapSize.getOrElse(uniqueName, 0L)
- val dictSize = tableMapDictSize.getOrElse(uniqueName, 0L)
- Row(values(0), values(1), bytesToDisplaySize(indexSize),
- bytesToDisplaySize(datamapSize), bytesToDisplaySize(dictSize))
+ val tableList: Seq[Row] = carbonTables.map {
+ carbonTable =>
+ val tableResult = getTableCache(sparkSession, carbonTable)
+ var (indexSize, datamapSize) = (tableResult(0).getLong(1), 0L)
+ tableResult.drop(2).foreach {
+ row =>
+ indexSize += row.getLong(1)
+ datamapSize += row.getLong(2)
}
+ val dictSize = tableResult(1).getLong(1)
- Seq(
- Row("ALL", "ALL", bytesToDisplaySize(allIndexSize),
- bytesToDisplaySize(allDatamapSize), bytesToDisplaySize(allDictSize)),
- Row(currentDatabase, "ALL", bytesToDisplaySize(dbIndexSize),
- bytesToDisplaySize(dbDatamapSize), bytesToDisplaySize(dbDictSize))
- ) ++ tableList
- }
- }
- }
-
- def showTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
- val cache = CacheProvider.getInstance().getCarbonCache()
- if (cache == null) {
- Seq.empty
- } else {
- val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
- var numIndexFilesCached = 0
+ dbIndexSize += indexSize
+ dbDictSize += dictSize
+ dbDatamapSize += datamapSize
- // Path -> Name, Type
- val datamapName = mutable.Map[String, (String, String)]()
- // Path -> Size
- val datamapSize = mutable.Map[String, Long]()
- // parent table
- datamapName.put(tablePath, ("", ""))
- datamapSize.put(tablePath, 0)
- // children tables
- for( schema <- carbonTable.getTableInfo.getDataMapSchemaList.asScala ) {
- val childTableName = carbonTable.getTableName + "_" + schema.getDataMapName
- val childTable = CarbonEnv
- .getCarbonTable(Some(carbonTable.getDatabaseName), childTableName)(sparkSession)
- val path = childTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
- val name = schema.getDataMapName
- val dmType = schema.getProviderName
- datamapName.put(path, (name, dmType))
- datamapSize.put(path, 0)
- }
- // index schemas
- for (schema <- DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
- .asScala) {
- val path = tablePath + schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR
- val name = schema.getDataMapName
- val dmType = schema.getProviderName
- datamapName.put(path, (name, dmType))
- datamapSize.put(path, 0)
- }
-
- var dictSize = 0L
-
- // dictionary column ids
- val dictIds = carbonTable
- .getAllDimensions
- .asScala
- .filter(_.isGlobalDictionaryEncoding)
- .map(_.getColumnId)
- .toArray
-
- val cacheIterator = cache.getCacheMap.entrySet().iterator()
- while (cacheIterator.hasNext) {
- val entry = cacheIterator.next()
- val cache = entry.getValue
-
- if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
- // index
- val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
- CarbonCommonConstants.FILE_SEPARATOR)
- val pathEntry = datamapSize.filter(entry => indexPath.startsWith(entry._1))
- if(pathEntry.nonEmpty) {
- val (path, size) = pathEntry.iterator.next()
- datamapSize.put(path, size + cache.getMemorySize)
- }
- if(indexPath.startsWith(tablePath)) {
- numIndexFilesCached += 1
+ val tableName = if (!carbonTable.isTransactionalTable) {
+ carbonTable.getTableName + " (external table)"
}
- } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
- // bloom datamap
- val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
- CarbonCommonConstants.FILE_SEPARATOR)
- val pathEntry = datamapSize.filter(entry => shardPath.contains(entry._1))
- if(pathEntry.nonEmpty) {
- val (path, size) = pathEntry.iterator.next()
- datamapSize.put(path, size + cache.getMemorySize)
+ else {
+ carbonTable.getTableName
}
- } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
- // dictionary
- val dictId = dictIds.find(id => entry.getKey.startsWith(id))
- if (dictId.isDefined) {
- dictSize = dictSize + cache.getMemorySize
+ (currentDatabase, tableName, indexSize, datamapSize, dictSize)
+ }.collect {
+ case (db, table, indexSize, datamapSize, dictSize) if !((indexSize == 0) &&
+ (datamapSize == 0) &&
+ (dictSize == 0)) =>
+ Row(db, table, indexSize, datamapSize, dictSize)
+ }
+
+ // Scan whole cache and fill the entries for All-Database-All-Tables
+ var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
+ cache.getCacheMap.asScala.foreach {
+ case (_, cacheable) =>
+ cacheable match {
+ case _: BlockletDataMapIndexWrapper =>
+ allIndexSize += cacheable.getMemorySize
+ case _: BloomCacheKeyValue.CacheValue =>
+ allDatamapSize += cacheable.getMemorySize
+ case _: AbstractColumnDictionaryInfo =>
+ allDictSize += cacheable.getMemorySize
}
- }
}
- // get all index files
- val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath)
- val numIndexFilesAll = CarbonDataMergerUtil.getValidSegmentList(absoluteTableIdentifier)
- .asScala.map {
- segment =>
- segment.getCommittedIndexFile
- }.flatMap {
- indexFilesMap => indexFilesMap.keySet().toArray
- }.size
+ Seq(
+ Row("ALL", "ALL", allIndexSize, allDatamapSize, allDictSize),
+ Row(currentDatabase, "ALL", dbIndexSize, dbDatamapSize, dbDictSize)
+ ) ++ tableList
+ }
+ }
- var result = Seq(
- Row("Index", bytesToDisplaySize(datamapSize.get(tablePath).get),
- numIndexFilesCached + "/" + numIndexFilesAll + " index files cached"),
- Row("Dictionary", bytesToDisplaySize(dictSize), "")
- )
- for ((path, size) <- datamapSize) {
- if (path != tablePath) {
- val (dmName, dmType) = datamapName.get(path).get
- result = result :+ Row(dmName, bytesToDisplaySize(size), dmType)
- }
- }
- result
+ def getTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
+ val cache = CacheProvider.getInstance().getCarbonCache
+ val showTableCacheEvent = ShowTableCacheEvent(carbonTable, sparkSession, internalCall)
+ val operationContext = new OperationContext
+ // datamapName -> (datamapProviderName, indexSize, datamapSize)
+ val currentTableSizeMap = scala.collection.mutable.Map[String, (String, String, Long, Long)]()
+ operationContext.setProperty(carbonTable.getTableUniqueName, currentTableSizeMap)
+ OperationListenerBus.getInstance.fireEvent(showTableCacheEvent, operationContext)
+
+ // Get all Index files for the specified table.
+ val allIndexFiles: List[String] = CacheUtil.getAllIndexFiles(carbonTable)
+ val indexFilesInCache: List[String] = allIndexFiles.filter {
+ indexFile =>
+ cache.get(indexFile) != null
+ }
+ val sizeOfIndexFilesInCache: Long = indexFilesInCache.map {
+ indexFile =>
+ cache.get(indexFile).getMemorySize
+ }.sum
+
+ // Extract dictionary keys for the table and create cache keys from those
+ val dictKeys = CacheUtil.getAllDictCacheKeys(carbonTable)
+ val sizeOfDictInCache = dictKeys.collect {
+ case dictKey if cache.get(dictKey) != null =>
+ cache.get(dictKey).getMemorySize
+ }.sum
+
+ // Assemble result for all the datamaps for the table
+ val otherDatamaps = operationContext.getProperty(carbonTable.getTableUniqueName)
+ .asInstanceOf[mutable.Map[String, (String, Long, Long)]]
+ val otherDatamapsResults: Seq[Row] = otherDatamaps.map {
+ case (name, (provider, indexSize, dmSize)) =>
+ Row(name, indexSize, dmSize, provider)
+ }.toSeq
+
+ var comments = indexFilesInCache.size + "/" + allIndexFiles.size + " index files cached"
+ if (!carbonTable.isTransactionalTable) {
+ comments += " (external table)"
}
+ Seq(
+ Row("Index", sizeOfIndexFilesInCache, comments),
+ Row("Dictionary", sizeOfDictInCache, "")
+ ) ++ otherDatamapsResults
}
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
if (tableIdentifier.isEmpty) {
- showAllTablesCache(sparkSession)
+ /**
+ * Assemble result for database
+ */
+ val result = getAllTablesCache(sparkSession)
+ result.map {
+ row =>
+ Row(row.get(0), row.get(1), bytesToDisplaySize(row.getLong(2)),
+ bytesToDisplaySize(row.getLong(3)), bytesToDisplaySize(row.getLong(4)))
+ }
} else {
+ /**
+ * Assemble result for table
+ */
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
- if (carbonTable.isChildDataMap) {
- throw new UnsupportedOperationException("Operation not allowed on child table.")
+ if (CacheProvider.getInstance().getCarbonCache == null) {
+ return Seq.empty
+ }
+ val rawResult = getTableCache(sparkSession, carbonTable)
+ val result = rawResult.slice(0, 2) ++
+ rawResult.drop(2).map {
+ row =>
+ Row(row.get(0), row.getLong(1) + row.getLong(2), row.get(3))
+ }
+ result.map {
+ row =>
+ Row(row.get(0), bytesToDisplaySize(row.getLong(1)), row.get(2))
}
- showTableCache(sparkSession, carbonTable)
}
}
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
new file mode 100644
index 0000000..6c8bb54
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCacheEventListeners.scala
@@ -0,0 +1,121 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.command.cache
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener.LOGGER
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
+import org.apache.carbondata.events.{DropTableCacheEvent, Event, OperationContext,
+ OperationEventListener}
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+
+object DropCachePreAggEventListener extends OperationEventListener {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ event match {
+ case dropCacheEvent: DropTableCacheEvent =>
+ val carbonTable = dropCacheEvent.carbonTable
+ val sparkSession = dropCacheEvent.sparkSession
+ val internalCall = dropCacheEvent.internalCall
+ if (carbonTable.isChildDataMap && !internalCall) {
+ throw new UnsupportedOperationException("Operation not allowed on child table.")
+ }
+
+ if (carbonTable.hasDataMapSchema) {
+ val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
+ .filter(_.getRelationIdentifier != null)
+ for (childSchema <- childrenSchemas) {
+ val childTable =
+ CarbonEnv.getCarbonTable(
+ TableIdentifier(childSchema.getRelationIdentifier.getTableName,
+ Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
+ try {
+ val dropCacheCommandForChildTable =
+ CarbonDropCacheCommand(
+ TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
+ internalCall = true)
+ dropCacheCommandForChildTable.processMetadata(sparkSession)
+ }
+ catch {
+ case e: Exception =>
+ LOGGER.warn(
+ s"Clean cache for PreAgg table ${ childTable.getTableName } failed.", e)
+ }
+ }
+ }
+ }
+ }
+}
+
+
+object DropCacheBloomEventListener extends OperationEventListener {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ event match {
+ case dropCacheEvent: DropTableCacheEvent =>
+ val carbonTable = dropCacheEvent.carbonTable
+ val cache = CacheProvider.getInstance().getCarbonCache
+ val datamaps = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+ .asScala.toList
+ val segments = CarbonDataMergerUtil
+ .getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala.toList
+
+ datamaps.foreach {
+ case datamap if datamap.getProviderName
+ .equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName) =>
+ try {
+ // Get datamap keys
+ val datamapKeys = CacheUtil.getBloomCacheKeys(carbonTable, datamap)
+
+ // remove datamap keys from cache
+ cache.removeAll(datamapKeys.asJava)
+ } catch {
+ case e: Exception =>
+ LOGGER.warn(
+ s"Clean cache for Bloom datamap ${datamap.getDataMapName} failed.", e)
+ }
+ case _ =>
+ }
+ }
+ }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala
deleted file mode 100644
index 3d03c60..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.command.cache
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.catalyst.TableIdentifier
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.events.{DropCacheEvent, Event, OperationContext,
- OperationEventListener}
-
-object DropCachePreAggEventListener extends OperationEventListener {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- /**
- * Called on a specified event occurrence
- *
- * @param event
- * @param operationContext
- */
- override protected def onEvent(event: Event,
- operationContext: OperationContext): Unit = {
-
- event match {
- case dropCacheEvent: DropCacheEvent =>
- val carbonTable = dropCacheEvent.carbonTable
- val sparkSession = dropCacheEvent.sparkSession
- val internalCall = dropCacheEvent.internalCall
- if (carbonTable.isChildDataMap && !internalCall) {
- throw new UnsupportedOperationException("Operation not allowed on child table.")
- }
-
- if (carbonTable.hasDataMapSchema) {
- val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
- .filter(_.getRelationIdentifier != null)
- for (childSchema <- childrenSchemas) {
- val childTable =
- CarbonEnv.getCarbonTable(
- TableIdentifier(childSchema.getRelationIdentifier.getTableName,
- Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
- val dropCacheCommandForChildTable =
- CarbonDropCacheCommand(
- TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
- internalCall = true)
- dropCacheCommandForChildTable.processMetadata(sparkSession)
- }
- }
- }
-
- }
-}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
new file mode 100644
index 0000000..70f63a4
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/ShowCacheEventListeners.scala
@@ -0,0 +1,126 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.command.cache
+
+import java.util
+import java.util.{HashSet, Set}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.CarbonEnv
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.datamap.bloom.{BloomCacheKeyValue, BloomCoarseGrainDataMapFactory}
+import org.apache.carbondata.events._
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+
+object ShowCachePreAggEventListener extends OperationEventListener {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ event match {
+ case showTableCacheEvent: ShowTableCacheEvent =>
+ val carbonTable = showTableCacheEvent.carbonTable
+ val sparkSession = showTableCacheEvent.sparkSession
+ val internalCall = showTableCacheEvent.internalCall
+ if (carbonTable.isChildDataMap && !internalCall) {
+ throw new UnsupportedOperationException("Operation not allowed on child table.")
+ }
+
+ val currentTableSizeMap = operationContext.getProperty(carbonTable.getTableUniqueName)
+ .asInstanceOf[mutable.Map[String, (String, Long, Long)]]
+
+ if (carbonTable.hasDataMapSchema) {
+ val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
+ .filter(_.getRelationIdentifier != null)
+ for (childSchema <- childrenSchemas) {
+ val datamapName = childSchema.getDataMapName
+ val datamapProvider = childSchema.getProviderName
+ val childCarbonTable = CarbonEnv.getCarbonTable(
+ TableIdentifier(childSchema.getRelationIdentifier.getTableName,
+ Some(carbonTable.getDatabaseName)))(sparkSession)
+
+ val resultForChild = CarbonShowCacheCommand(None, true)
+ .getTableCache(sparkSession, childCarbonTable)
+ val datamapSize = resultForChild.head.getLong(1)
+ currentTableSizeMap.put(datamapName, (datamapProvider, datamapSize, 0L))
+ }
+ }
+ }
+ }
+}
+
+
+object ShowCacheBloomEventListener extends OperationEventListener {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ event match {
+ case showTableCacheEvent: ShowTableCacheEvent =>
+ val carbonTable = showTableCacheEvent.carbonTable
+ val cache = CacheProvider.getInstance().getCarbonCache
+ val currentTableSizeMap = operationContext.getProperty(carbonTable.getTableUniqueName)
+ .asInstanceOf[mutable.Map[String, (String, Long, Long)]]
+
+ // Extract all datamaps for the table
+ val datamaps = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+ .asScala
+
+ datamaps.foreach {
+ case datamap if datamap.getProviderName
+ .equalsIgnoreCase(DataMapClassProvider.BLOOMFILTER.getShortName) =>
+
+ // Get datamap keys
+ val datamapKeys = CacheUtil.getBloomCacheKeys(carbonTable, datamap)
+
+ // calculate the memory size if key exists in cache
+ val datamapSize = datamapKeys.collect {
+ case key if cache.get(key) != null =>
+ cache.get(key).getMemorySize
+ }.sum
+
+ // put the datmap size into main table's map.
+ currentTableSizeMap
+ .put(datamap.getDataMapName, (datamap.getProviderName, 0L, datamapSize))
+
+ case _ =>
+ }
+ }
+ }
+}