You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/28 11:07:58 UTC
carbondata git commit: [CARBONDATA-2647] [CARBONDATA-2648] Add
support for COLUMN_META_CACHE and CACHE_LEVEL in create table and alter table
properties
Repository: carbondata
Updated Branches:
refs/heads/master e7103397d -> a0350e100
[CARBONDATA-2647] [CARBONDATA-2648] Add support for COLUMN_META_CACHE and CACHE_LEVEL in create table and alter table properties
Things done as part of this PR
Support for configuring COLUMN_META_CACHE in create and alter table set properties DDL.
Support for configuring CACHE_LEVEL in create and alter table set properties DDL.
Describe formatted display support for COLUMN_META_CACHE and CACHE_LEVEL
Any interfaces changed?
Create Table Syntax
CREATE TABLE [dbName].tableName (col1 String, col2 String, col3 int,…) STORED BY ‘carbondata’ TBLPROPERTIES (‘COLUMN_META_CACHE’=’col1,col2,…’, 'CACHE_LEVEL'='BLOCKLET')
Alter Table set properties Syntax
ALTER TABLE [dbName].tableName SET TBLPROPERTIES (‘COLUMN_META_CACHE’=’col1,col2,…’, 'CACHE_LEVEL'='BLOCKLET')
This closs #2418
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a0350e10
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a0350e10
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a0350e10
Branch: refs/heads/master
Commit: a0350e100e262fef85409b3b4fe2a488d688f706
Parents: e710339
Author: manishgupta88 <to...@gmail.com>
Authored: Tue Jun 26 20:11:14 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 28 16:37:43 2018 +0530
----------------------------------------------------------------------
.../dictionary/ManageDictionaryAndBTree.java | 18 ++
.../core/constants/CarbonCommonConstants.java | 12 +
.../core/metadata/schema/table/CarbonTable.java | 38 +++
...ithColumnMetCacheAndCacheLevelProperty.scala | 168 ++++++++++++
...ithColumnMetCacheAndCacheLevelProperty.scala | 155 +++++++++++
.../describeTable/TestDescribeTable.scala | 4 +-
.../carbondata/spark/util/CommonUtil.scala | 91 ++++++-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 31 ++-
.../table/CarbonDescribeFormattedCommand.scala | 8 +
.../org/apache/spark/util/AlterTableUtil.scala | 263 +++++++++++++++++--
10 files changed, 766 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
index a7d6027..b54fb14 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
@@ -149,4 +149,22 @@ public class ManageDictionaryAndBTree {
}
}
+ /**
+ * This method will remove the BTree instances from LRU cache for all the segments
+ *
+ * @param carbonTable
+ */
+ public static void invalidateBTreeCache(CarbonTable carbonTable) {
+ LoadMetadataDetails[] loadMetadataDetails =
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+ if (loadMetadataDetails.length > 0) {
+ String[] segments = new String[loadMetadataDetails.length];
+ int loadCounter = 0;
+ for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+ segments[loadCounter++] = loadMetadataDetail.getLoadName();
+ }
+ invalidateBTreeCache(carbonTable.getAbsoluteTableIdentifier(), segments);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 118ff28..da40862 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1839,6 +1839,18 @@ public final class CarbonCommonConstants {
* the node minimum load data default value
*/
public static final int CARBON_LOAD_MIN_SIZE_DEFAULT = 256;
+ /**
+ * property to be specified for caching min/max of required columns
+ */
+ public static final String COLUMN_META_CACHE = "column_meta_cache";
+ /**
+ * property to be specified for caching level (Block/Blocket)
+ */
+ public static final String CACHE_LEVEL = "cache_level";
+ /**
+ * default value for cache level
+ */
+ public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK";
private CarbonCommonConstants() {
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 68bd749..710c7af 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1165,4 +1165,42 @@ public class CarbonTable implements Serializable {
table.setLocalDictionaryEnabled(Boolean.parseBoolean("false"));
}
}
+
+ /**
+ * Method to get the list of cached columns of the table
+ *
+ * @param tableName
+ * @return
+ */
+ public List<String> getCachedColumns(String tableName) {
+ List<String> cachedColsList = new ArrayList<>(tableDimensionsMap.size());
+ String cacheColumns =
+ tableInfo.getFactTable().getTableProperties().get(CarbonCommonConstants.COLUMN_META_CACHE);
+ if (null != cacheColumns && !cacheColumns.isEmpty()) {
+ List<CarbonDimension> carbonDimensions = tableDimensionsMap.get(tableName);
+ List<CarbonMeasure> carbonMeasures = tableMeasuresMap.get(tableName);
+ String[] cachedCols = cacheColumns.split(",");
+ for (String column : cachedCols) {
+ boolean found = false;
+ // this will avoid adding the columns which have been dropped from the table
+ for (CarbonDimension dimension : carbonDimensions) {
+ if (dimension.getColName().equals(column)) {
+ cachedColsList.add(column);
+ found = true;
+ break;
+ }
+ }
+ // if column is not a dimension then check in measures
+ if (!found) {
+ for (CarbonMeasure measure : carbonMeasures) {
+ if (measure.getColName().equals(column)) {
+ cachedColsList.add(column);
+ break;
+ }
+ }
+ }
+ }
+ }
+ return cachedColsList;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
new file mode 100644
index 0000000..dbe9c75
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.alterTable
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
+/**
+ * test class for validating alter table set properties with alter_column_meta_cache and
+ * cache_level properties
+ */
+class TestAlterTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest with BeforeAndAfterAll {
+
+ private def isExpectedValueValid(dbName: String,
+ tableName: String,
+ key: String,
+ expectedValue: String): Boolean = {
+ val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession)
+ val value = carbonTable.getTableInfo.getFactTable.getTableProperties.get(key)
+ expectedValue.equals(value)
+ }
+
+ private def dropTable = {
+ sql("drop table if exists alter_column_meta_cache")
+ sql("drop table if exists cache_level")
+ }
+
+ override def beforeAll {
+ // drop table
+ dropTable
+ // create table
+ sql("create table alter_column_meta_cache(c1 String, c2 String, c3 int, c4 double, c5 struct<imei:string, imsi:string>, c6 array<string>) stored by 'carbondata'")
+ sql("create table cache_level(c1 String) stored by 'carbondata'")
+ }
+
+ test("validate column_meta_cache with only empty spaces - alter_column_meta_cache_01") {
+ intercept[RuntimeException] {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'=' ')")
+ }
+ }
+
+ test("validate the property with characters in different cases - alter_column_meta_cache_02") {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('COLUMN_meta_CachE'='c2,c3')")
+ assert(isExpectedValueValid("default", "alter_column_meta_cache", "column_meta_cache", "c2,c3"))
+ }
+
+ test("validate column_meta_cache with intermediate empty string between columns - alter_column_meta_cache_03") {
+ intercept[RuntimeException] {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2, ,c3')")
+ }
+ }
+
+ test("validate column_meta_cache with combination of valid and invalid columns - alter_column_meta_cache_04") {
+ intercept[RuntimeException] {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c10')")
+ }
+ }
+
+ test("validate column_meta_cache for dimensions and measures - alter_column_meta_cache_05") {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c3,c2,c4')")
+ assert(isExpectedValueValid("default", "alter_column_meta_cache", "column_meta_cache", "c2,c3,c4"))
+ }
+
+ test("validate for duplicate column names - alter_column_meta_cache_06") {
+ intercept[RuntimeException] {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c2,c3')")
+ }
+ }
+
+ test("validate column_meta_cache for complex struct type - alter_column_meta_cache_07") {
+ intercept[RuntimeException] {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c5')")
+ }
+ }
+
+ test("validate column_meta_cache for complex array type - alter_column_meta_cache_08") {
+ intercept[RuntimeException] {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c5,c2')")
+ }
+ }
+
+ test("validate column_meta_cache with empty value - alter_column_meta_cache_09") {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='')")
+ assert(isExpectedValueValid("default", "alter_column_meta_cache", "column_meta_cache", ""))
+ }
+
+ test("validate describe formatted command to display column_meta_cache when column_meta_cache is set - alter_column_meta_cache_10") {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2')")
+ val descResult = sql("describe formatted alter_column_meta_cache")
+ checkExistence(descResult, true, "column_meta_cache")
+ }
+
+ test("validate unsetting of column_meta_cache when column_meta_cache is already set - alter_column_meta_cache_11") {
+ sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c3')")
+ var descResult = sql("describe formatted alter_column_meta_cache")
+ checkExistence(descResult, true, "COLUMN_META_CACHE")
+ sql("Alter table alter_column_meta_cache UNSET TBLPROPERTIES('column_meta_cache')")
+ descResult = sql("describe formatted alter_column_meta_cache")
+ checkExistence(descResult, false, "COLUMN_META_CACHE")
+ }
+
+ test("validate unsetting of column_meta_cache when column_meta_cache is not already set - alter_column_meta_cache_12") {
+ var descResult = sql("describe formatted alter_column_meta_cache")
+ checkExistence(descResult, false, "COLUMN_META_CACHE")
+ sql("Alter table alter_column_meta_cache UNSET TBLPROPERTIES('column_meta_cache')")
+ descResult = sql("describe formatted alter_column_meta_cache")
+ checkExistence(descResult, false, "COLUMN_META_CACHE")
+ }
+
+ test("validate cache_level with only empty spaces - ALTER_CACHE_LEVEL_01") {
+ intercept[RuntimeException] {
+ sql("Alter table cache_level SET TBLPROPERTIES('cache_level'=' ')")
+ }
+ }
+
+ test("validate cache_level with invalid values - ALTER_CACHE_LEVEL_02") {
+ intercept[RuntimeException] {
+ sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='xyz,abc')")
+ }
+ }
+
+ test("validate cache_level with property in different cases - ALTER_CACHE_LEVEL_03") {
+ sql("Alter table cache_level SET TBLPROPERTIES('CACHE_leveL'='BLOcK')")
+ assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCK"))
+ }
+
+ test("validate cache_level with default value as Blocklet - ALTER_CACHE_LEVEL_04") {
+ sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='bloCKlet')")
+ assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCKLET"))
+ }
+
+ test("validate describe formatted command to display cache_level when cache_level is set - ALTER_CACHE_LEVEL_05") {
+ sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='bloCKlet')")
+ val descResult = sql("describe formatted cache_level")
+ checkExistence(descResult, true, "CACHE_LEVEL")
+ }
+
+ test("validate describe formatted command to display cache_level when cache_level is not set - ALTER_CACHE_LEVEL_06") {
+ sql("Alter table cache_level UNSET TBLPROPERTIES('cache_level')")
+ val descResult = sql("describe formatted cache_level")
+ // even though not configured default cache level will be displayed as BLOCK
+ checkExistence(descResult, true, "CACHE_LEVEL")
+ }
+
+ override def afterAll: Unit = {
+ // drop table
+ dropTable
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
new file mode 100644
index 0000000..8dadef9
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
+/**
+ * test class for validating create table with column_meta_cache and cache_level properties
+ */
+class TestCreateTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest with BeforeAndAfterAll {
+
+ private def isExpectedValueValid(dbName: String,
+ tableName: String,
+ key: String,
+ expectedValue: String): Boolean = {
+ val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession)
+ val value = carbonTable.getTableInfo.getFactTable.getTableProperties.get(key)
+ expectedValue.equals(value)
+ }
+
+ test("validate column_meta_cache with only empty spaces - COLUMN_META_CACHE_01") {
+ sql("drop table if exists column_meta_cache")
+ intercept[MalformedCarbonCommandException] {
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'=' ')")
+ }
+ }
+
+ test("validate the property with characters in different cases - COLUMN_META_CACHE_02") {
+ sql("drop table if exists column_meta_cache")
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('COLUMN_meta_CachE'='c2')")
+ assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", "c2"))
+ }
+
+ test("validate column_meta_cache with intermediate empty string between columns - COLUMN_META_CACHE_03") {
+ sql("drop table if exists column_meta_cache")
+ intercept[MalformedCarbonCommandException] {
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2, ,c3')")
+ }
+ }
+
+ test("validate column_meta_cache with combination of valid and invalid columns - COLUMN_META_CACHE_04") {
+ sql("drop table if exists column_meta_cache")
+ intercept[MalformedCarbonCommandException] {
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,c10')")
+ }
+ }
+
+ test("validate column_meta_cache for dimensions and measures - COLUMN_META_CACHE_05") {
+ sql("drop table if exists column_meta_cache")
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c3,c2,c4')")
+ assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", "c2,c3,c4"))
+ }
+
+ test("validate for duplicate column names - COLUMN_META_CACHE_06") {
+ sql("drop table if exists column_meta_cache")
+ intercept[MalformedCarbonCommandException] {
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,c2,c3')")
+ }
+ }
+
+ test("validate column_meta_cache for complex struct type - COLUMN_META_CACHE_07") {
+ sql("drop table if exists column_meta_cache")
+ intercept[MalformedCarbonCommandException] {
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double, c5 struct<imei:string, imsi:string>) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c5')")
+ }
+ }
+
+ test("validate column_meta_cache for complex array type - COLUMN_META_CACHE_08") {
+ sql("drop table if exists column_meta_cache")
+ intercept[MalformedCarbonCommandException] {
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double, c5 array<string>) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c5,c2')")
+ }
+ }
+
+ test("validate column_meta_cache with empty value - COLUMN_META_CACHE_09") {
+ sql("drop table if exists column_meta_cache")
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='')")
+ assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", ""))
+ }
+
+ test("validate describe formatted command to display column_meta_cache when column_meta_cache is set - COLUMN_META_CACHE_10") {
+ sql("drop table if exists column_meta_cache")
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('COLUMN_meta_CachE'='c2')")
+ val descResult = sql("describe formatted column_meta_cache")
+ checkExistence(descResult, true, "COLUMN_META_CACHE")
+ }
+
+ test("validate describe formatted command to display column_meta_cache when column_meta_cache is not set - COLUMN_META_CACHE_11") {
+ sql("drop table if exists column_meta_cache")
+ sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata'")
+ val descResult = sql("describe formatted column_meta_cache")
+ checkExistence(descResult, false, "COLUMN_META_CACHE")
+ }
+
+ test("validate cache_level with only empty spaces - CACHE_LEVEL_01") {
+ sql("drop table if exists cache_level")
+ intercept[MalformedCarbonCommandException] {
+ sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'=' ')")
+ }
+ }
+
+ test("validate cache_level with invalid values - CACHE_LEVEL_02") {
+ sql("drop table if exists cache_level")
+ intercept[MalformedCarbonCommandException] {
+ sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='xyz,abc')")
+ }
+ }
+
+ test("validate cache_level with property in different cases - CACHE_LEVEL_03") {
+ sql("drop table if exists cache_level")
+ sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('CACHE_leveL'='BLOcK')")
+ assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCK"))
+ }
+
+ test("validate cache_level with default value as Blocklet - CACHE_LEVEL_04") {
+ sql("drop table if exists cache_level")
+ sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='bloCKlet')")
+ assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCKLET"))
+ }
+
+ test("validate describe formatted command to display cache_level when cache_level is set - CACHE_LEVEL_05") {
+ sql("drop table if exists cache_level")
+ sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='bloCKlet')")
+ val descResult = sql("describe formatted cache_level")
+ checkExistence(descResult, true, "CACHE_LEVEL")
+ }
+
+ test("validate describe formatted command to display cache_level when cache_level is not set - CACHE_LEVEL_06") {
+ sql("drop table if exists cache_level")
+ sql("create table cache_level(c1 String) stored by 'carbondata'")
+ val descResult = sql("describe formatted cache_level")
+ // even though not configured default cache level will be displayed as BLOCK
+ checkExistence(descResult, true, "CACHE_LEVEL")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
index 5598457..ceb0ac3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
@@ -51,10 +51,10 @@ class TestDescribeTable extends QueryTest with BeforeAndAfterAll {
test("test describe formatted table desc1") {
val resultCol = Seq("", "", "##Detailed Column property", "##Detailed Table Information", "ADAPTIVE", "CARBON Store Path", "Comment", "Database Name", "Last Update Time",
- "SORT_COLUMNS", "SORT_SCOPE", "Streaming", "Table Block Size", "Local Dictionary Enabled", "Local Dictionary Threshold","Table Data Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4")
+ "SORT_COLUMNS", "SORT_SCOPE", "CACHE_LEVEL", "Streaming", "Table Block Size", "Local Dictionary Enabled", "Local Dictionary Threshold","Table Data Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4")
val resultRow: Seq[Row] = resultCol map(propName => Row(f"$propName%-36s"))
checkAnswer(sql("desc formatted DESC1").select("col_name"), resultRow)
- assert(sql("desc formatted desc1").count() == 22)
+ assert(sql("desc formatted desc1").count() == 23)
}
test("test describe formatted for partition table") {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 3995aa7..4723e6b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -26,11 +26,11 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.Map
import scala.util.Random
-import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.sql.{Row, RowFactory}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
import org.apache.spark.sql.types.{MetadataBuilder, StringType}
@@ -896,4 +896,93 @@ object CommonUtil {
CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
}
+ /**
+ * This method will validate the cache level
+ *
+ * @param cacheLevel
+ * @param tableProperties
+ */
+ def validateCacheLevel(cacheLevel: String, tableProperties: Map[String, String]): Unit = {
+ val supportedCacheLevel = Seq("BLOCK", "BLOCKLET")
+ if (cacheLevel.trim.isEmpty) {
+ val errorMessage = "Invalid value: Empty column names for the option(s): " +
+ CarbonCommonConstants.CACHE_LEVEL
+ throw new MalformedCarbonCommandException(errorMessage)
+ } else {
+ val trimmedCacheLevel = cacheLevel.trim.toUpperCase
+ if (!supportedCacheLevel.contains(trimmedCacheLevel)) {
+ val errorMessage = s"Invalid value: Allowed vaLues for ${
+ CarbonCommonConstants.CACHE_LEVEL} are BLOCK AND BLOCKLET"
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+ tableProperties.put(CarbonCommonConstants.CACHE_LEVEL, trimmedCacheLevel)
+ }
+ }
+
+ /**
+ * This will validate the column meta cache i.e the columns to be cached.
+ * By default all dimensions will be cached.
+ * If the property is already defined in create table DDL then validate it,
+ * else add all the dimension columns as columns to be cached to table properties.
+ * valid values for COLUMN_META_CACHE can either be empty or can have one or more comma
+ * separated values
+ */
+ def validateColumnMetaCacheFields(dbName: String,
+ tableName: String,
+ tableColumns: Seq[String],
+ cachedColumns: String,
+ tableProperties: Map[String, String]): Unit = {
+ val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+ // below check is added because empty value for column_meta_cache is allowed and in that
+ // case there should not be any validation
+ if (!cachedColumns.equals("")) {
+ validateColumnMetaCacheOption(tableIdentifier, dbName, cachedColumns, tableColumns)
+ val columnsToBeCached = cachedColumns.split(",").map(x => x.trim.toLowerCase).toSeq
+ // make the columns in create table order and then add it to table properties
+ val createOrder = tableColumns.filter(col => columnsToBeCached.contains(col))
+ tableProperties.put(CarbonCommonConstants.COLUMN_META_CACHE, createOrder.mkString(","))
+ }
+ }
+
+ /**
+ * Validate the column_meta_cache option in tableproperties
+ *
+ * @param tableIdentifier
+ * @param databaseName
+ * @param columnsToBeCached
+ * @param tableColumns
+ */
+ private def validateColumnMetaCacheOption(tableIdentifier: TableIdentifier,
+ databaseName: String,
+ columnsToBeCached: String,
+ tableColumns: Seq[String]): Unit = {
+ // check if only empty spaces are given in the property value
+ if (columnsToBeCached.trim.isEmpty) {
+ val errorMessage = "Invalid value: Empty column names for the option(s): " +
+ CarbonCommonConstants.COLUMN_META_CACHE
+ throw new MalformedCarbonCommandException(errorMessage)
+ } else {
+ val columns: Array[String] = columnsToBeCached.split(',').map(x => x.toLowerCase.trim)
+ // Check for duplicate column names
+ columns.groupBy(col => col.toLowerCase).foreach(f => if (f._2.size > 1) {
+ throw new MalformedCarbonCommandException(s"Duplicate column name found : ${ f._1 }")
+ })
+ columns.foreach(col => {
+ // check if any intermediate column is empty
+ if (null == col || col.trim.isEmpty) {
+ val errorMessage = "Invalid value: Empty column names for the option(s): " +
+ CarbonCommonConstants.COLUMN_META_CACHE
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+ // check if the column exists in the table
+ if (!tableColumns.contains(col.toLowerCase)) {
+ val errorMessage = s"Column $col does not exists in the table ${
+ databaseName
+ }.${ tableIdentifier.table }"
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+ })
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index e534f5f..13d1ff7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -381,7 +381,36 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
// get partitionInfo
val partitionInfo = getPartitionInfo(partitionCols, tableProperties)
-
+ if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
+ // validate the column_meta_cache option
+ val tableColumns = dims.map(x => x.name.get) ++ msrs.map(x => x.name.get)
+ CommonUtil.validateColumnMetaCacheFields(tableName,
+ dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
+ tableColumns,
+ tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get,
+ tableProperties)
+ val columnsToBeCached = tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get
+ if (columnsToBeCached.nonEmpty) {
+ columnsToBeCached.split(",").foreach { column =>
+ val dimFieldToBeCached = dims.filter(x => x.name.get.equals(column))
+ // first element is taken as each column with have a unique name
+ // check for complex type column
+ if (dimFieldToBeCached.nonEmpty &&
+ isComplexDimDictionaryExclude(dimFieldToBeCached(0).dataType.get)) {
+ val errorMessage =
+ s"$column is a complex type column and complex type is not allowed for " +
+ s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }"
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+ }
+ }
+ }
+ // validate the cache level
+ if (tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) {
+ CommonUtil.validateCacheLevel(
+ tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).get,
+ tableProperties)
+ }
// validate the tableBlockSize from table properties
CommonUtil.validateTableBlockSize(tableProperties)
// validate table level properties for compaction
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 3b56a35..23b5cba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -108,6 +108,8 @@ private[sql] case class CarbonDescribeFormattedCommand(
results ++= Seq(("SORT_SCOPE", tblProps.asScala.getOrElse("sort_scope", CarbonCommonConstants
.LOAD_SORT_SCOPE_DEFAULT), tblProps.asScala.getOrElse("sort_scope", CarbonCommonConstants
.LOAD_SORT_SCOPE_DEFAULT)))
+ // add Cache Level property
+ results ++= Seq(("CACHE_LEVEL", tblProps.getOrDefault("CACHE_LEVEL", "BLOCK"), ""))
val isStreaming = tblProps.asScala.getOrElse("streaming", "false")
results ++= Seq(("Streaming", isStreaming, ""))
val isLocalDictEnabled = tblProps.asScala
@@ -187,6 +189,12 @@ private[sql] case class CarbonDescribeFormattedCommand(
results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
relation.carbonTable.getTableName).asScala
.map(column => column).mkString(","), ""))
+ // add columns configured in column meta cache
+ if (null != tblProps.get(CarbonCommonConstants.COLUMN_META_CACHE)) {
+ results ++=
+ Seq(("COLUMN_META_CACHE", carbonTable.getCachedColumns(carbonTable.getTableName).asScala
+ .map(col => col).mkString(","), ""))
+ }
if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
results ++=
Seq(("#Partition Information", "", ""),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index c291ae2..a6a730b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import java.util
+
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
@@ -29,14 +31,18 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.CommonUtil
object AlterTableUtil {
@@ -312,16 +318,22 @@ object AlterTableUtil {
(sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
val tableName = tableIdentifier.table
val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"Alter table properties request has been received for $dbName.$tableName")
+ LOGGER.audit(s"Alter table newProperties request has been received for $dbName.$tableName")
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
var locks = List.empty[ICarbonLock]
val timeStamp = 0L
- var carbonTable: CarbonTable = null
try {
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ val lowerCasePropertiesMap: mutable.Map[String, String] = mutable.Map.empty
+ // convert all the keys to lower case
+ properties.foreach { entry =>
+ lowerCasePropertiesMap.put(entry._1.toLowerCase, entry._2)
+ }
+ // validate the required cache level properties
+ validateColumnMetaCacheAndCacheLevel(carbonTable, lowerCasePropertiesMap)
// get the latest carbon table
// read the latest schema file
val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
@@ -337,16 +349,19 @@ object AlterTableUtil {
wrapperTableInfo, dbName, tableName)
val tblPropertiesMap: mutable.Map[String, String] =
thriftTable.fact_table.getTableProperties.asScala
+ // below map will be used for cache invalidation. As tblProperties map is getting modified
+ // in the next few steps the original map need to be retained for any decision making
+ val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*)
if (set) {
- // This overrides old properties and update the comment parameter of thriftTable
- // with the newly added/modified comment since thriftTable also holds comment as its
- // direct property.
- properties.foreach { property => if (validateTableProperties(property._1)) {
- tblPropertiesMap.put(property._1.toLowerCase, property._2)
- } else { val errorMessage = "Error: Invalid option(s): " + property._1.toString()
+ // This overrides old newProperties and update the comment parameter of thriftTable
+ // with the newly added/modified comment since thriftTable also holds comment as its
+ // direct property.
+ lowerCasePropertiesMap.foreach { property => if (validateTableProperties(property._1)) {
+ tblPropertiesMap.put(property._1, property._2)
+ } else {
+ val errorMessage = "Error: Invalid option(s): " + property._1.toString()
throw new MalformedCarbonCommandException(errorMessage)
- }
- }
+ }}
} else {
// This removes the comment parameter from thriftTable
// since thriftTable also holds comment as its property.
@@ -364,20 +379,232 @@ object AlterTableUtil {
thriftTable)(sparkSession)
catalog.alterTable(tableIdentifier, schemParts, cols)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
- LOGGER.info(s"Alter table properties is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table properties is successful for table $dbName.$tableName")
+ // check and clear the block/blocklet cache
+ checkAndClearBlockletCache(carbonTable,
+ existingTablePropertiesMap,
+ lowerCasePropertiesMap,
+ propKeys,
+ set)
+ LOGGER.info(s"Alter table newProperties is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table newProperties is successful for table $dbName.$tableName")
} catch {
case e: Exception =>
- LOGGER.error(e, "Alter table properties failed")
- sys.error(s"Alter table properties operation failed: ${e.getMessage}")
+ LOGGER.error(e, "Alter table newProperties failed")
+ sys.error(s"Alter table newProperties operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion
AlterTableUtil.releaseLocks(locks)
}
}
- def validateTableProperties(propKey: String): Boolean = {
- val supportedOptions = Seq("STREAMING", "COMMENT")
- supportedOptions.contains(propKey.toUpperCase)
+ private def validateTableProperties(propKey: String): Boolean = {
+ val supportedOptions = Seq("STREAMING", "COMMENT", "COLUMN_META_CACHE", "CACHE_LEVEL")
+ supportedOptions.contains(propKey.toUpperCase)
+ }
+
+ /**
+ * validate column meta cache and cache level properties if configured by the user
+ *
+ * @param carbonTable
+ * @param propertiesMap
+ */
+ private def validateColumnMetaCacheAndCacheLevel(carbonTable: CarbonTable,
+ propertiesMap: mutable.Map[String, String]): Unit = {
+ // validate column meta cache property
+ if (propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
+ // Column meta cache is not allowed for child tables and dataMaps
+ if (carbonTable.isChildDataMap) {
+ throw new MalformedCarbonCommandException(s"Table property ${
+ CarbonCommonConstants.COLUMN_META_CACHE} is not allowed for child datamaps")
+ }
+ val schemaList: util.List[ColumnSchema] = CarbonUtil
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName),
+ carbonTable.getMeasureByTableName(carbonTable.getTableName))
+ val tableColumns: Seq[String] = schemaList.asScala
+ .map(columnSchema => columnSchema.getColumnName)
+ CommonUtil
+ .validateColumnMetaCacheFields(carbonTable.getDatabaseName,
+ carbonTable.getTableName,
+ tableColumns,
+ propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).get,
+ propertiesMap)
+ val columnsToBeCached = propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).get
+ validateForComplexTypeColumn(carbonTable, columnsToBeCached)
+ }
+ // validate cache level property
+ if (propertiesMap.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) {
+ // Cache level is not allowed for child tables and dataMaps
+ if (carbonTable.isChildDataMap) {
+ throw new MalformedCarbonCommandException(s"Table property ${
+ CarbonCommonConstants.CACHE_LEVEL} is not allowed for child datamaps")
+ }
+ CommonUtil.validateCacheLevel(
+ propertiesMap.get(CarbonCommonConstants.CACHE_LEVEL).get,
+ propertiesMap)
+ }
+ }
+
+ /**
+ * This method will validate if there is any complex type column in the columns to be cached
+ *
+ * @param carbonTable
+ * @param cachedColumns
+ */
+ private def validateForComplexTypeColumn(carbonTable: CarbonTable,
+ cachedColumns: String): Unit = {
+ if (cachedColumns.nonEmpty) {
+ cachedColumns.split(",").foreach { column =>
+ val dimension = carbonTable.getDimensionByName(carbonTable.getTableName, column)
+ if (null != dimension && dimension.isComplex) {
+ val errorMessage =
+ s"$column is a complex type column and complex type is not allowed for " +
+ s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }"
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+ }
+ }
+ }
+
+ /**
+ * This method will check and clear the driver block/blocklet cache
+ *
+ * @param carbonTable
+ * @param existingTableProperties
+ * @param newProperties
+ * @param propKeys
+ * @param set
+ */
+ private def checkAndClearBlockletCache(carbonTable: CarbonTable,
+ existingTableProperties: scala.collection.mutable.Map[String, String],
+ newProperties: mutable.Map[String, String],
+ propKeys: Seq[String],
+ set: Boolean): Unit = {
+ if (set) {
+ clearBlockletCacheForCachingProperties(carbonTable, existingTableProperties, newProperties)
+ } else {
+ // convert all the unset keys to lower case
+ val propertiesToBeRemoved = propKeys.map(key => key.toLowerCase)
+ // This is unset scenario and the cache needs to be cleaned only when
+ // 1. column_meta_cache property is getting unset and existing table properties contains
+ // this property
+ // 2. cache_level property is being unset and existing table properties contains this property
+ // and the existing value is not equal to default value because after un-setting the property
+ // the cache should be loaded again with default value
+ if (propertiesToBeRemoved.contains(CarbonCommonConstants.COLUMN_META_CACHE) &&
+ existingTableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
+ ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+ } else if (propertiesToBeRemoved.contains(CarbonCommonConstants.CACHE_LEVEL)) {
+ val cacheLevel = existingTableProperties.get(CarbonCommonConstants.CACHE_LEVEL)
+ if (cacheLevel.isDefined &&
+ !cacheLevel.equals(CarbonCommonConstants.CACHE_LEVEL_DEFAULT_VALUE)) {
+ ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+ }
+ }
+ }
+ }
+
+ /**
+ * This method will validate the column_meta_cache and cache_level properties and clear the
+ * driver block/blocklet cache
+ *
+ * @param carbonTable
+ * @param tblPropertiesMap
+ * @param newProperties
+ */
+ private def clearBlockletCacheForCachingProperties(
+ carbonTable: CarbonTable,
+ tblPropertiesMap: scala.collection.mutable.Map[String, String],
+ newProperties: mutable.Map[String, String]): Unit = {
+ // check if column meta cache is defined. if defined then validate and clear the BTree cache
+ // if required
+ val columnMetaCacheProperty = newProperties.get(CarbonCommonConstants.COLUMN_META_CACHE)
+ columnMetaCacheProperty match {
+ case Some(newColumnsToBeCached) =>
+ if (!checkIfColumnsAreAlreadyCached(carbonTable, tblPropertiesMap
+ .get(CarbonCommonConstants.COLUMN_META_CACHE), newColumnsToBeCached)) {
+ ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+ }
+ case None =>
+ // don't do anything
+ }
+ // check if column meta cache is defined. if defined then validate and clear the BTree cache
+ // if required
+ val cacheLevelProperty = newProperties.get(CarbonCommonConstants.CACHE_LEVEL)
+ cacheLevelProperty match {
+ case Some(newCacheLevelValue) =>
+ if (!isCacheLevelValid(tblPropertiesMap.get(CarbonCommonConstants.CACHE_LEVEL),
+ newCacheLevelValue)) {
+ ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+ }
+ case None =>
+ // don't do anything
+ }
+ }
+
+ /**
+ * Method to verify if the existing cache level is same as the new cache level
+ *
+ * @param existingCacheLevelValue
+ * @param newCacheLevelValue
+ * @return
+ */
+ private def isCacheLevelValid(existingCacheLevelValue: Option[String],
+ newCacheLevelValue: String): Boolean = {
+ existingCacheLevelValue match {
+ case Some(existingValue) =>
+ existingValue.equals(newCacheLevelValue)
+ case None =>
+ false
+ }
+ }
+
+ /**
+ * Check the new columns to be cached with the already cached columns. If count of new columns
+ * and already cached columns is same and all the new columns are already cached then
+ * false will be returned else true
+ *
+ * @param carbonTable
+ * @param existingCacheColumns
+ * @param newColumnsToBeCached
+ * @return
+ */
+ private def checkIfColumnsAreAlreadyCached(
+ carbonTable: CarbonTable,
+ existingCacheColumns: Option[String],
+ newColumnsToBeCached: String): Boolean = {
+ val newColumns = newColumnsToBeCached.split(",").map(x => x.trim.toLowerCase)
+ val isCached = existingCacheColumns match {
+ case Some(value) =>
+ val existingProperty = value.split(",").map(x => x.trim.toLowerCase)
+ compareColumns(existingProperty, newColumns)
+ case None =>
+ // By default all the columns in the table will be cached. This case is to compare all the
+ // table columns already cached to the newly specified cached columns
+ val schemaList: util.List[ColumnSchema] = CarbonUtil
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName),
+ carbonTable.getMeasureByTableName(carbonTable.getTableName))
+ val tableColumns: Array[String] = schemaList.asScala
+ .map(columnSchema => columnSchema.getColumnName).toArray
+ compareColumns(tableColumns, newColumns)
+ }
+ isCached
}
+
+ /**
+ * compare the existing cache columns and the new columns to be cached
+ *
+ * @param existingCachedColumns
+ * @param newColumnsToBeCached
+ * @return
+ */
+ private def compareColumns(existingCachedColumns: Array[String],
+ newColumnsToBeCached: Array[String]): Boolean = {
+ val allColumnsMatch = if (existingCachedColumns.length == newColumnsToBeCached.length) {
+ existingCachedColumns.filter(col => !newColumnsToBeCached.contains(col)).length == 0
+ } else {
+ false
+ }
+ allColumnsMatch
+ }
+
}