You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2019/03/05 10:39:04 UTC
[carbondata] branch master updated: [CARBONDATA-3305] Support show
metacache command to list the cache sizes for all tables
This is an automated email from the ASF dual-hosted git repository.
xuchuanyin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 718be37 [CARBONDATA-3305] Support show metacache command to list the cache sizes for all tables
718be37 is described below
commit 718be37295a55de3317191118bf74720e4de800f
Author: QiangCai <qi...@qq.com>
AuthorDate: Thu Jan 17 15:39:33 2019 +0800
[CARBONDATA-3305] Support show metacache command to list the cache sizes for all tables
>>> SHOW METACACHE
+--------+--------+----------+------------+---------------+
|Database|Table |Index size|Datamap size|Dictionary size|
+--------+--------+----------+------------+---------------+
|ALL |ALL |842 Bytes |982 Bytes |80.34 KB |
|default |ALL |842 Bytes |982 Bytes |80.34 KB |
|default |t1 |225 Bytes |982 Bytes |0 |
|default |t1_dpagg|259 Bytes |0 |0 |
|default |t2 |358 Bytes |0 |80.34 KB |
+--------+--------+----------+------------+---------------+
>>> SHOW METACACHE FOR TABLE t1
+----------+---------+----------------------+
|Field |Size |Comment |
+----------+---------+----------------------+
|Index |225 Bytes|1/1 index files cached|
|Dictionary|0 | |
|dpagg |259 Bytes|preaggregate |
|dblom |982 Bytes|bloomfilter |
+----------+---------+----------------------+
>>> SHOW METACACHE FOR TABLE t2
+----------+---------+----------------------+
|Field |Size |Comment |
+----------+---------+----------------------+
|Index |358 Bytes|2/2 index files cached|
|Dictionary|80.34 KB | |
+----------+---------+----------------------+
This closes #3078
---
.../carbondata/core/cache/CacheProvider.java | 4 +
.../carbondata/core/cache/CarbonLRUCache.java | 4 +
docs/ddl-of-carbondata.md | 21 ++
.../sql/commands/TestCarbonShowCacheCommand.scala | 163 +++++++++++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 1 +
.../command/cache/CarbonShowCacheCommand.scala | 312 +++++++++++++++++++++
.../spark/sql/parser/CarbonSpark2SqlParser.scala | 12 +-
7 files changed, 515 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index 99b1693..deb48e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -195,4 +195,8 @@ public class CacheProvider {
}
cacheTypeToCacheMap.clear();
}
+
+ public CarbonLRUCache getCarbonCache() {
+ return carbonLRUCache;
+ }
}
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
index 87254e3..74ff8a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
@@ -305,4 +305,8 @@ public final class CarbonLRUCache {
lruCacheMap.clear();
}
}
+
+ public Map<String, Cacheable> getCacheMap() {
+ return lruCacheMap;
+ }
}
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 0d0e5bd..3476475 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -67,6 +67,7 @@ CarbonData DDL statements are documented here,which includes:
* [SPLIT PARTITION](#split-a-partition)
* [DROP PARTITION](#drop-a-partition)
* [BUCKETING](#bucketing)
+* [CACHE](#cache)
## CREATE TABLE
@@ -1088,4 +1089,24 @@ Users can specify which columns to include and exclude for local dictionary gene
TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='productName')
```
+## CACHE
+ CarbonData internally uses LRU caching to improve the performance. The user can get information
+ about current cache used status in memory through the following command:
+
+ ```sql
+ SHOW METADATA
+ ```
+
+ This shows the overall memory consumed in the cache by categories - index files, dictionary and
+ datamaps. This also shows the cache usage by all the tables and children tables in the current
+ database.
+
+ ```sql
+ SHOW METADATA ON TABLE tableName
+ ```
+
+ This shows detailed information on cache usage by the table `tableName` and its carbonindex files,
+ its dictionary files, its datamaps and children tables.
+
+ This command is not allowed on child tables.
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
new file mode 100644
index 0000000..0e1cd00
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sql.commands
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
+ override protected def beforeAll(): Unit = {
+ // use new database
+ sql("drop database if exists cache_db cascade").collect()
+ sql("drop database if exists cache_empty_db cascade").collect()
+ sql("create database cache_db").collect()
+ sql("create database cache_empty_db").collect()
+ dropTable
+ sql("use cache_db").collect()
+ sql(
+ """
+ | CREATE TABLE cache_db.cache_1
+ | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
+ | workgroupcategoryname String, deptno int, deptname String, projectcode int,
+ | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
+ | salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('DICTIONARY_INCLUDE'='deptname')
+ """.stripMargin)
+ // bloom
+ sql("CREATE DATAMAP IF NOT EXISTS cache_1_bloom ON TABLE cache_db.cache_1 USING 'bloomfilter' " +
+ "DMPROPERTIES('INDEX_COLUMNS'='deptno')")
+ sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_1 ")
+
+ sql(
+ """
+ | CREATE TABLE cache_2
+ | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
+ | workgroupcategoryname String, deptno int, deptname String, projectcode int,
+ | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
+ | salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_db.cache_2 ")
+ sql("insert into table cache_2 select * from cache_1").collect()
+
+ sql(
+ """
+ | CREATE TABLE cache_3
+ | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
+ | workgroupcategoryname String, deptno int, deptname String, projectcode int,
+ | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
+ | salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_3 ")
+
+ // use default database
+ sql("use default").collect()
+ sql(
+ """
+ | CREATE TABLE cache_4
+ | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
+ | workgroupcategoryname String, deptno int, deptname String, projectcode int,
+ | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
+ | salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql("insert into table cache_4 select * from cache_db.cache_2").collect()
+
+ // standard partition table
+ sql(
+ """
+ | CREATE TABLE cache_5
+ | (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
+ | workgroupcategoryname String, deptname String, projectcode int,
+ | projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
+ | salary int)
+ | PARTITIONED BY (deptno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ "insert into table cache_5 select empno,empname,designation,doj,workgroupcategory," +
+ "workgroupcategoryname,deptname,projectcode,projectjoindate,projectenddate,attendance," +
+ "utilization,salary,deptno from cache_4").collect()
+
+ // datamap
+ sql("create datamap cache_4_count on table cache_4 using 'preaggregate' as " +
+ "select workgroupcategoryname,count(empname) as count from cache_4 group by workgroupcategoryname")
+
+ // count star to cache index
+ sql("select max(deptname) from cache_db.cache_1").collect()
+ sql("SELECT deptno FROM cache_db.cache_1 where deptno=10").collect()
+ sql("select count(*) from cache_db.cache_2").collect()
+ sql("select count(*) from cache_4").collect()
+ sql("select count(*) from cache_5").collect()
+ sql("select workgroupcategoryname,count(empname) as count from cache_4 group by workgroupcategoryname").collect()
+ }
+
+
+ override protected def afterAll(): Unit = {
+ sql("use default").collect()
+ dropTable
+ }
+
+ private def dropTable = {
+ sql("DROP TABLE IF EXISTS cache_db.cache_1")
+ sql("DROP TABLE IF EXISTS cache_db.cache_2")
+ sql("DROP TABLE IF EXISTS cache_db.cache_3")
+ sql("DROP TABLE IF EXISTS default.cache_4")
+ sql("DROP TABLE IF EXISTS default.cache_5")
+ }
+
+ test("show cache") {
+ sql("use cache_empty_db").collect()
+ val result1 = sql("show metacache").collect()
+ assertResult(2)(result1.length)
+ assertResult(Row("cache_empty_db", "ALL", "0", "0", "0"))(result1(1))
+
+ sql("use cache_db").collect()
+ val result2 = sql("show metacache").collect()
+ assertResult(4)(result2.length)
+
+ sql("use default").collect()
+ val result3 = sql("show metacache").collect()
+ val dataMapCacheInfo = result3
+ .map(row => row.getString(1))
+ .filter(table => table.equals("cache_4_cache_4_count"))
+ assertResult(1)(dataMapCacheInfo.length)
+ }
+
+ test("show metacache on table") {
+ sql("use cache_db").collect()
+ val result1 = sql("show metacache on table cache_1").collect()
+ assertResult(3)(result1.length)
+
+ val result2 = sql("show metacache on table cache_db.cache_2").collect()
+ assertResult(2)(result2.length)
+
+ checkAnswer(sql("show metacache on table cache_db.cache_3"),
+ Seq(Row("Index", "0 bytes", "0/1 index files cached"), Row("Dictionary", "0 bytes", "")))
+
+ val result4 = sql("show metacache on table default.cache_4").collect()
+ assertResult(3)(result4.length)
+
+ sql("use default").collect()
+ val result5 = sql("show metacache on table cache_5").collect()
+ assertResult(2)(result5.length)
+ }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index dc75243..e03bebd 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -155,6 +155,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val HISTORY = carbonKeyWord("HISTORY")
protected val SEGMENTS = carbonKeyWord("SEGMENTS")
protected val SEGMENT = carbonKeyWord("SEGMENT")
+ protected val METACACHE = carbonKeyWord("METACACHE")
protected val STRING = carbonKeyWord("STRING")
protected val INTEGER = carbonKeyWord("INTEGER")
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
new file mode 100644
index 0000000..e937c32
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.cache
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.FileUtils.byteCountToDisplaySize
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.types.{LongType, StringType}
+
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+
+/**
+ * SHOW CACHE
+ */
+case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier])
+ extends MetadataCommand {
+
+ override def output: Seq[AttributeReference] = {
+ if (tableIdentifier.isEmpty) {
+ Seq(
+ AttributeReference("Database", StringType, nullable = false)(),
+ AttributeReference("Table", StringType, nullable = false)(),
+ AttributeReference("Index size", StringType, nullable = false)(),
+ AttributeReference("Datamap size", StringType, nullable = false)(),
+ AttributeReference("Dictionary size", StringType, nullable = false)())
+ } else {
+ Seq(
+ AttributeReference("Field", StringType, nullable = false)(),
+ AttributeReference("Size", StringType, nullable = false)(),
+ AttributeReference("Comment", StringType, nullable = false)())
+ }
+ }
+
+ override protected def opName: String = "SHOW CACHE"
+
+ def showAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
+ val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
+ val cache = CacheProvider.getInstance().getCarbonCache()
+ if (cache == null) {
+ Seq(Row("ALL", "ALL", 0L, 0L, 0L),
+ Row(currentDatabase, "ALL", 0L, 0L, 0L))
+ } else {
+ val tableIdents = sparkSession.sessionState.catalog.listTables(currentDatabase).toArray
+ val dbLocation = CarbonEnv.getDatabaseLocation(currentDatabase, sparkSession)
+ val tempLocation = dbLocation.replace(
+ CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
+ val tablePaths = tableIdents.map { tableIdent =>
+ (tempLocation + CarbonCommonConstants.FILE_SEPARATOR +
+ tableIdent.table + CarbonCommonConstants.FILE_SEPARATOR,
+ CarbonEnv.getDatabaseName(tableIdent.database)(sparkSession) + "." + tableIdent.table)
+ }
+
+ val dictIds = tableIdents
+ .map { tableIdent =>
+ var table: CarbonTable = null
+ try {
+ table = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
+ } catch {
+ case _ =>
+ }
+ table
+ }
+ .filter(_ != null)
+ .flatMap { table =>
+ table
+ .getAllDimensions
+ .asScala
+ .filter(_.isGlobalDictionaryEncoding)
+ .toArray
+ .map(dim => (dim.getColumnId, table.getDatabaseName + "." + table.getTableName))
+ }
+
+ // all databases
+ var (allIndexSize, allDatamapSize, allDictSize) = (0L, 0L, 0L)
+ // current database
+ var (dbIndexSize, dbDatamapSize, dbDictSize) = (0L, 0L, 0L)
+ val tableMapIndexSize = mutable.HashMap[String, Long]()
+ val tableMapDatamapSize = mutable.HashMap[String, Long]()
+ val tableMapDictSize = mutable.HashMap[String, Long]()
+ val cacheIterator = cache.getCacheMap.entrySet().iterator()
+ while (cacheIterator.hasNext) {
+ val entry = cacheIterator.next()
+ val cache = entry.getValue
+ if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
+ // index
+ allIndexSize = allIndexSize + cache.getMemorySize
+ val indexPath = entry.getKey.replace(
+ CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
+ val tablePath = tablePaths.find(path => indexPath.startsWith(path._1))
+ if (tablePath.isDefined) {
+ dbIndexSize = dbIndexSize + cache.getMemorySize
+ val memorySize = tableMapIndexSize.get(tablePath.get._2)
+ if (memorySize.isEmpty) {
+ tableMapIndexSize.put(tablePath.get._2, cache.getMemorySize)
+ } else {
+ tableMapIndexSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
+ }
+ }
+ } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
+ // bloom datamap
+ allDatamapSize = allDatamapSize + cache.getMemorySize
+ val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+ CarbonCommonConstants.FILE_SEPARATOR)
+ val tablePath = tablePaths.find(path => shardPath.contains(path._1))
+ if (tablePath.isDefined) {
+ dbDatamapSize = dbDatamapSize + cache.getMemorySize
+ val memorySize = tableMapDatamapSize.get(tablePath.get._2)
+ if (memorySize.isEmpty) {
+ tableMapDatamapSize.put(tablePath.get._2, cache.getMemorySize)
+ } else {
+ tableMapDatamapSize.put(tablePath.get._2, memorySize.get + cache.getMemorySize)
+ }
+ }
+ } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
+ // dictionary
+ allDictSize = allDictSize + cache.getMemorySize
+ val dictId = dictIds.find(id => entry.getKey.startsWith(id._1))
+ if (dictId.isDefined) {
+ dbDictSize = dbDictSize + cache.getMemorySize
+ val memorySize = tableMapDictSize.get(dictId.get._2)
+ if (memorySize.isEmpty) {
+ tableMapDictSize.put(dictId.get._2, cache.getMemorySize)
+ } else {
+ tableMapDictSize.put(dictId.get._2, memorySize.get + cache.getMemorySize)
+ }
+ }
+ }
+ }
+ if (tableMapIndexSize.isEmpty && tableMapDatamapSize.isEmpty && tableMapDictSize.isEmpty) {
+ Seq(
+ Row("ALL", "ALL", byteCountToDisplaySize(allIndexSize),
+ byteCountToDisplaySize(allDatamapSize), byteCountToDisplaySize(allDictSize)),
+ Row(currentDatabase, "ALL", "0", "0", "0"))
+ } else {
+ val tableList = tableMapIndexSize
+ .map(_._1)
+ .toSeq
+ .union(tableMapDictSize.map(_._1).toSeq)
+ .distinct
+ .sorted
+ .map { uniqueName =>
+ val values = uniqueName.split("\\.")
+ val indexSize = tableMapIndexSize.getOrElse(uniqueName, 0L)
+ val datamapSize = tableMapDatamapSize.getOrElse(uniqueName, 0L)
+ val dictSize = tableMapDictSize.getOrElse(uniqueName, 0L)
+ Row(values(0), values(1), byteCountToDisplaySize(indexSize),
+ byteCountToDisplaySize(datamapSize), byteCountToDisplaySize(dictSize))
+ }
+
+ Seq(
+ Row("ALL", "ALL", byteCountToDisplaySize(allIndexSize),
+ byteCountToDisplaySize(allDatamapSize), byteCountToDisplaySize(allDictSize)),
+ Row(currentDatabase, "ALL", byteCountToDisplaySize(dbIndexSize),
+ byteCountToDisplaySize(dbDatamapSize), byteCountToDisplaySize(dbDictSize))
+ ) ++ tableList
+ }
+ }
+ }
+
+ def showTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
+ val tableName = carbonTable.getTableName
+ val databaseName = carbonTable.getDatabaseName
+ val cache = CacheProvider.getInstance().getCarbonCache()
+ if (cache == null) {
+ Seq.empty
+ } else {
+ val dbLocation = CarbonEnv
+ .getDatabaseLocation(databaseName, sparkSession)
+ .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
+ val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR +
+ tableName + CarbonCommonConstants.FILE_SEPARATOR
+ var numIndexFilesCached = 0
+
+ // Path -> Name, Type
+ val datamapName = mutable.Map[String, (String, String)]()
+ // Path -> Size
+ val datamapSize = mutable.Map[String, Long]()
+ // parent table
+ datamapName.put(tablePath, ("", ""))
+ datamapSize.put(tablePath, 0)
+ // children tables
+ for( schema <- carbonTable.getTableInfo.getDataMapSchemaList.asScala ) {
+ val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + "_" +
+ schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR
+ val name = schema.getDataMapName
+ val dmType = schema.getProviderName
+ datamapName.put(path, (name, dmType))
+ datamapSize.put(path, 0)
+ }
+ // index schemas
+ for (schema <- DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+ .asScala) {
+ val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName +
+ CarbonCommonConstants.FILE_SEPARATOR + schema.getDataMapName +
+ CarbonCommonConstants.FILE_SEPARATOR
+ val name = schema.getDataMapName
+ val dmType = schema.getProviderName
+ datamapName.put(path, (name, dmType))
+ datamapSize.put(path, 0)
+ }
+
+ var dictSize = 0L
+
+ // dictionary column ids
+ val dictIds = carbonTable
+ .getAllDimensions
+ .asScala
+ .filter(_.isGlobalDictionaryEncoding)
+ .map(_.getColumnId)
+ .toArray
+
+ val cacheIterator = cache.getCacheMap.entrySet().iterator()
+ while (cacheIterator.hasNext) {
+ val entry = cacheIterator.next()
+ val cache = entry.getValue
+
+ if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) {
+ // index
+ val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+ CarbonCommonConstants.FILE_SEPARATOR)
+ val pathEntry = datamapSize.filter(entry => indexPath.startsWith(entry._1))
+ if(pathEntry.nonEmpty) {
+ val (path, size) = pathEntry.iterator.next()
+ datamapSize.put(path, size + cache.getMemorySize)
+ }
+ if(indexPath.startsWith(tablePath)) {
+ numIndexFilesCached += 1
+ }
+ } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) {
+ // bloom datamap
+ val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+ CarbonCommonConstants.FILE_SEPARATOR)
+ val pathEntry = datamapSize.filter(entry => shardPath.contains(entry._1))
+ if(pathEntry.nonEmpty) {
+ val (path, size) = pathEntry.iterator.next()
+ datamapSize.put(path, size + cache.getMemorySize)
+ }
+ } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) {
+ // dictionary
+ val dictId = dictIds.find(id => entry.getKey.startsWith(id))
+ if (dictId.isDefined) {
+ dictSize = dictSize + cache.getMemorySize
+ }
+ }
+ }
+
+ // get all index files
+ val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath)
+ val numIndexFilesAll = CarbonDataMergerUtil.getValidSegmentList(absoluteTableIdentifier)
+ .asScala.map {
+ segment =>
+ segment.getCommittedIndexFile
+ }.flatMap {
+ indexFilesMap => indexFilesMap.keySet().toArray
+ }.size
+
+ var result = Seq(
+ Row("Index", byteCountToDisplaySize(datamapSize.get(tablePath).get),
+ numIndexFilesCached + "/" + numIndexFilesAll + " index files cached"),
+ Row("Dictionary", byteCountToDisplaySize(dictSize), "")
+ )
+ for ((path, size) <- datamapSize) {
+ if (path != tablePath) {
+ val (dmName, dmType) = datamapName.get(path).get
+ result = result :+ Row(dmName, byteCountToDisplaySize(size), dmType)
+ }
+ }
+ result
+ }
+ }
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ if (tableIdentifier.isEmpty) {
+ showAllTablesCache(sparkSession)
+ } else {
+ val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
+ if (carbonTable.isChildDataMap) {
+ throw new UnsupportedOperationException("Operation not allowed on child table.")
+ }
+ showTableCache(sparkSession, carbonTable)
+ }
+ }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index d1023fa..a2923b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -33,11 +33,11 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.execution.command.cache.CarbonShowCacheCommand
import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand}
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
-import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonOption
@@ -77,7 +77,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
protected lazy val startCommand: Parser[LogicalPlan] =
loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords |
- alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli
+ alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli | cacheManagement
protected lazy val loadManagement: Parser[LogicalPlan] =
deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -94,6 +94,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
protected lazy val stream: Parser[LogicalPlan] =
createStream | dropStream | showStreams
+ protected lazy val cacheManagement: Parser[LogicalPlan] =
+ showCache
+
protected lazy val alterAddPartition: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~>
"(" ~> repsep(stringLit, ",") <~ ")") <~ opt(";") ^^ {
@@ -494,6 +497,11 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
showHistory.isDefined)
}
+ protected lazy val showCache: Parser[LogicalPlan] =
+ SHOW ~> METACACHE ~> opt(ontable) <~ opt(";") ^^ {
+ case table =>
+ CarbonShowCacheCommand(table)
+ }
protected lazy val cli: Parser[LogicalPlan] =
(CARBONCLI ~> FOR ~> TABLE) ~> (ident <~ ".").? ~ ident ~