You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/28 11:07:58 UTC

carbondata git commit: [CARBONDATA-2647] [CARBONDATA-2648] Add support for COLUMN_META_CACHE and CACHE_LEVEL in create table and alter table properties

Repository: carbondata
Updated Branches:
  refs/heads/master e7103397d -> a0350e100


[CARBONDATA-2647] [CARBONDATA-2648] Add support for COLUMN_META_CACHE and CACHE_LEVEL in create table and alter table properties

Things done as part of this PR

Support for configuring COLUMN_META_CACHE in create and alter table set properties DDL.
Support for configuring CACHE_LEVEL in create and alter table set properties DDL.
Describe formatted display support for COLUMN_META_CACHE and CACHE_LEVEL
  Any interfaces changed?
Create Table Syntax
CREATE TABLE [dbName].tableName (col1 String, col2 String, col3 int,…) STORED BY ‘carbondata’ TBLPROPERTIES (‘COLUMN_META_CACHE’=’col1,col2,…’, 'CACHE_LEVEL'='BLOCKLET')

Alter Table set properties Syntax
ALTER TABLE [dbName].tableName SET TBLPROPERTIES (‘COLUMN_META_CACHE’=’col1,col2,…’, 'CACHE_LEVEL'='BLOCKLET')

This closs #2418


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a0350e10
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a0350e10
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a0350e10

Branch: refs/heads/master
Commit: a0350e100e262fef85409b3b4fe2a488d688f706
Parents: e710339
Author: manishgupta88 <to...@gmail.com>
Authored: Tue Jun 26 20:11:14 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 28 16:37:43 2018 +0530

----------------------------------------------------------------------
 .../dictionary/ManageDictionaryAndBTree.java    |  18 ++
 .../core/constants/CarbonCommonConstants.java   |  12 +
 .../core/metadata/schema/table/CarbonTable.java |  38 +++
 ...ithColumnMetCacheAndCacheLevelProperty.scala | 168 ++++++++++++
 ...ithColumnMetCacheAndCacheLevelProperty.scala | 155 +++++++++++
 .../describeTable/TestDescribeTable.scala       |   4 +-
 .../carbondata/spark/util/CommonUtil.scala      |  91 ++++++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  31 ++-
 .../table/CarbonDescribeFormattedCommand.scala  |   8 +
 .../org/apache/spark/util/AlterTableUtil.scala  | 263 +++++++++++++++++--
 10 files changed, 766 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
index a7d6027..b54fb14 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
@@ -149,4 +149,22 @@ public class ManageDictionaryAndBTree {
     }
   }
 
+  /**
+   * This method will remove the BTree instances from LRU cache for all the segments
+   *
+   * @param carbonTable
+   */
+  public static void invalidateBTreeCache(CarbonTable carbonTable) {
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+    if (loadMetadataDetails.length > 0) {
+      String[] segments = new String[loadMetadataDetails.length];
+      int loadCounter = 0;
+      for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+        segments[loadCounter++] = loadMetadataDetail.getLoadName();
+      }
+      invalidateBTreeCache(carbonTable.getAbsoluteTableIdentifier(), segments);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 118ff28..da40862 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1839,6 +1839,18 @@ public final class CarbonCommonConstants {
    *  the node minimum load data default value
    */
   public static final int CARBON_LOAD_MIN_SIZE_DEFAULT = 256;
+  /**
+   * property to be specified for caching min/max of required columns
+   */
+  public static final String COLUMN_META_CACHE = "column_meta_cache";
+  /**
+   * property to be specified for caching level (Block/Blocket)
+   */
+  public static final String CACHE_LEVEL = "cache_level";
+  /**
+   * default value for cache level
+   */
+  public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK";
 
   private CarbonCommonConstants() {
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 68bd749..710c7af 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1165,4 +1165,42 @@ public class CarbonTable implements Serializable {
       table.setLocalDictionaryEnabled(Boolean.parseBoolean("false"));
     }
   }
+
+  /**
+   * Method to get the list of cached columns of the table
+   *
+   * @param tableName
+   * @return
+   */
+  public List<String> getCachedColumns(String tableName) {
+    List<String> cachedColsList = new ArrayList<>(tableDimensionsMap.size());
+    String cacheColumns =
+        tableInfo.getFactTable().getTableProperties().get(CarbonCommonConstants.COLUMN_META_CACHE);
+    if (null != cacheColumns && !cacheColumns.isEmpty()) {
+      List<CarbonDimension> carbonDimensions = tableDimensionsMap.get(tableName);
+      List<CarbonMeasure> carbonMeasures = tableMeasuresMap.get(tableName);
+      String[] cachedCols = cacheColumns.split(",");
+      for (String column : cachedCols) {
+        boolean found = false;
+        // this will avoid adding the columns which have been dropped from the table
+        for (CarbonDimension dimension : carbonDimensions) {
+          if (dimension.getColName().equals(column)) {
+            cachedColsList.add(column);
+            found = true;
+            break;
+          }
+        }
+        // if column is not a dimension then check in measures
+        if (!found) {
+          for (CarbonMeasure measure : carbonMeasures) {
+            if (measure.getColName().equals(column)) {
+              cachedColsList.add(column);
+              break;
+            }
+          }
+        }
+      }
+    }
+    return cachedColsList;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
new file mode 100644
index 0000000..dbe9c75
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.alterTable
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
+/**
+ * test class for validating alter table set properties with alter_column_meta_cache and
+ * cache_level properties
+ */
+class TestAlterTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest with BeforeAndAfterAll {
+
+  private def isExpectedValueValid(dbName: String,
+      tableName: String,
+      key: String,
+      expectedValue: String): Boolean = {
+    val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession)
+    val value = carbonTable.getTableInfo.getFactTable.getTableProperties.get(key)
+    expectedValue.equals(value)
+  }
+
+  private def dropTable = {
+    sql("drop table if exists alter_column_meta_cache")
+    sql("drop table if exists cache_level")
+  }
+
+  override def beforeAll {
+    // drop table
+    dropTable
+    // create table
+    sql("create table alter_column_meta_cache(c1 String, c2 String, c3 int, c4 double, c5 struct<imei:string, imsi:string>, c6 array<string>) stored by 'carbondata'")
+    sql("create table cache_level(c1 String) stored by 'carbondata'")
+  }
+
+  test("validate column_meta_cache with only empty spaces - alter_column_meta_cache_01") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='    ')")
+    }
+  }
+
+  test("validate the property with characters in different cases - alter_column_meta_cache_02") {
+    sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('COLUMN_meta_CachE'='c2,c3')")
+    assert(isExpectedValueValid("default", "alter_column_meta_cache", "column_meta_cache", "c2,c3"))
+  }
+
+  test("validate column_meta_cache with intermediate empty string between columns - alter_column_meta_cache_03") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,  ,c3')")
+    }
+  }
+
+  test("validate column_meta_cache with combination of valid and invalid columns - alter_column_meta_cache_04") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c10')")
+    }
+  }
+
+  test("validate column_meta_cache for dimensions and measures - alter_column_meta_cache_05") {
+    sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c3,c2,c4')")
+    assert(isExpectedValueValid("default", "alter_column_meta_cache", "column_meta_cache", "c2,c3,c4"))
+  }
+
+  test("validate for duplicate column names - alter_column_meta_cache_06") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c2,c3')")
+    }
+  }
+
+  test("validate column_meta_cache for complex struct type - alter_column_meta_cache_07") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c5')")
+    }
+  }
+
+  test("validate column_meta_cache for complex array type - alter_column_meta_cache_08") {
+    intercept[RuntimeException] {
+      sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c5,c2')")
+    }
+  }
+
+  test("validate column_meta_cache with empty value - alter_column_meta_cache_09") {
+    sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='')")
+    assert(isExpectedValueValid("default", "alter_column_meta_cache", "column_meta_cache", ""))
+  }
+
+  test("validate describe formatted command to display column_meta_cache when column_meta_cache is set - alter_column_meta_cache_10") {
+    sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2')")
+    val descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, true, "column_meta_cache")
+  }
+
+  test("validate unsetting of column_meta_cache when column_meta_cache is already set - alter_column_meta_cache_11") {
+    sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c3')")
+    var descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, true, "COLUMN_META_CACHE")
+    sql("Alter table alter_column_meta_cache UNSET TBLPROPERTIES('column_meta_cache')")
+    descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, false, "COLUMN_META_CACHE")
+  }
+
+  test("validate unsetting of column_meta_cache when column_meta_cache is not already set - alter_column_meta_cache_12") {
+    var descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, false, "COLUMN_META_CACHE")
+    sql("Alter table alter_column_meta_cache UNSET TBLPROPERTIES('column_meta_cache')")
+    descResult = sql("describe formatted alter_column_meta_cache")
+    checkExistence(descResult, false, "COLUMN_META_CACHE")
+  }
+
+  test("validate cache_level with only empty spaces - ALTER_CACHE_LEVEL_01") {
+    intercept[RuntimeException] {
+      sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='    ')")
+    }
+  }
+
+  test("validate cache_level with invalid values - ALTER_CACHE_LEVEL_02") {
+    intercept[RuntimeException] {
+      sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='xyz,abc')")
+    }
+  }
+
+  test("validate cache_level with property in different cases - ALTER_CACHE_LEVEL_03") {
+    sql("Alter table cache_level SET TBLPROPERTIES('CACHE_leveL'='BLOcK')")
+    assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCK"))
+  }
+
+  test("validate cache_level with default value as Blocklet - ALTER_CACHE_LEVEL_04") {
+    sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='bloCKlet')")
+    assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCKLET"))
+  }
+
+  test("validate describe formatted command to display cache_level when cache_level is set - ALTER_CACHE_LEVEL_05") {
+    sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='bloCKlet')")
+    val descResult = sql("describe formatted cache_level")
+    checkExistence(descResult, true, "CACHE_LEVEL")
+  }
+
+  test("validate describe formatted command to display cache_level when cache_level is not set - ALTER_CACHE_LEVEL_06") {
+    sql("Alter table cache_level UNSET TBLPROPERTIES('cache_level')")
+    val descResult = sql("describe formatted cache_level")
+    // even though not configured default cache level will be displayed as BLOCK
+    checkExistence(descResult, true, "CACHE_LEVEL")
+  }
+
+  override def afterAll: Unit = {
+    // drop table
+    dropTable
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
new file mode 100644
index 0000000..8dadef9
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
+/**
+ * test class for validating create table with column_meta_cache and cache_level properties
+ */
+class TestCreateTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest with BeforeAndAfterAll {
+
+  private def isExpectedValueValid(dbName: String,
+      tableName: String,
+      key: String,
+      expectedValue: String): Boolean = {
+    val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession)
+    val value = carbonTable.getTableInfo.getFactTable.getTableProperties.get(key)
+    expectedValue.equals(value)
+  }
+
+  test("validate column_meta_cache with only empty spaces - COLUMN_META_CACHE_01") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='    ')")
+    }
+  }
+
+  test("validate the property with characters in different cases - COLUMN_META_CACHE_02") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('COLUMN_meta_CachE'='c2')")
+    assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", "c2"))
+  }
+
+  test("validate column_meta_cache with intermediate empty string between columns - COLUMN_META_CACHE_03") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,  ,c3')")
+    }
+  }
+
+  test("validate column_meta_cache with combination of valid and invalid columns - COLUMN_META_CACHE_04") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,c10')")
+    }
+  }
+
+  test("validate column_meta_cache for dimensions and measures - COLUMN_META_CACHE_05") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c3,c2,c4')")
+    assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", "c2,c3,c4"))
+  }
+
+  test("validate for duplicate column names - COLUMN_META_CACHE_06") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c2,c2,c3')")
+    }
+  }
+
+  test("validate column_meta_cache for complex struct type - COLUMN_META_CACHE_07") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double, c5 struct<imei:string, imsi:string>) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c5')")
+    }
+  }
+
+  test("validate column_meta_cache for complex array type - COLUMN_META_CACHE_08") {
+    sql("drop table if exists column_meta_cache")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double, c5 array<string>) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c5,c2')")
+    }
+  }
+
+  test("validate column_meta_cache with empty value - COLUMN_META_CACHE_09") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='')")
+    assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", ""))
+  }
+
+  test("validate describe formatted command to display column_meta_cache when column_meta_cache is set - COLUMN_META_CACHE_10") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('COLUMN_meta_CachE'='c2')")
+    val descResult = sql("describe formatted column_meta_cache")
+    checkExistence(descResult, true, "COLUMN_META_CACHE")
+  }
+
+  test("validate describe formatted command to display column_meta_cache when column_meta_cache is not set - COLUMN_META_CACHE_11") {
+    sql("drop table if exists column_meta_cache")
+    sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata'")
+    val descResult = sql("describe formatted column_meta_cache")
+    checkExistence(descResult, false, "COLUMN_META_CACHE")
+  }
+
+  test("validate cache_level with only empty spaces - CACHE_LEVEL_01") {
+    sql("drop table if exists cache_level")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='    ')")
+    }
+  }
+
+  test("validate cache_level with invalid values - CACHE_LEVEL_02") {
+    sql("drop table if exists cache_level")
+    intercept[MalformedCarbonCommandException] {
+      sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='xyz,abc')")
+    }
+  }
+
+  test("validate cache_level with property in different cases - CACHE_LEVEL_03") {
+    sql("drop table if exists cache_level")
+    sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('CACHE_leveL'='BLOcK')")
+    assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCK"))
+  }
+
+  test("validate cache_level with default value as Blocklet - CACHE_LEVEL_04") {
+    sql("drop table if exists cache_level")
+    sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='bloCKlet')")
+    assert(isExpectedValueValid("default", "cache_level", "cache_level", "BLOCKLET"))
+  }
+
+  test("validate describe formatted command to display cache_level when cache_level is set - CACHE_LEVEL_05") {
+    sql("drop table if exists cache_level")
+    sql("create table cache_level(c1 String) stored by 'carbondata' TBLPROPERTIES('cache_level'='bloCKlet')")
+    val descResult = sql("describe formatted cache_level")
+    checkExistence(descResult, true, "CACHE_LEVEL")
+  }
+
+  test("validate describe formatted command to display cache_level when cache_level is not set - CACHE_LEVEL_06") {
+    sql("drop table if exists cache_level")
+    sql("create table cache_level(c1 String) stored by 'carbondata'")
+    val descResult = sql("describe formatted cache_level")
+    // even though not configured default cache level will be displayed as BLOCK
+    checkExistence(descResult, true, "CACHE_LEVEL")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
index 5598457..ceb0ac3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
@@ -51,10 +51,10 @@ class TestDescribeTable extends QueryTest with BeforeAndAfterAll {
   test("test describe formatted table desc1") {
 
     val resultCol = Seq("", "", "##Detailed Column property", "##Detailed Table Information", "ADAPTIVE", "CARBON Store Path", "Comment", "Database Name", "Last Update Time",
-    "SORT_COLUMNS", "SORT_SCOPE", "Streaming", "Table Block Size", "Local Dictionary Enabled", "Local Dictionary Threshold","Table Data Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4")
+    "SORT_COLUMNS", "SORT_SCOPE", "CACHE_LEVEL", "Streaming", "Table Block Size", "Local Dictionary Enabled", "Local Dictionary Threshold","Table Data Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4")
     val resultRow: Seq[Row] = resultCol map(propName => Row(f"$propName%-36s"))
     checkAnswer(sql("desc formatted DESC1").select("col_name"), resultRow)
-    assert(sql("desc formatted desc1").count() == 22)
+    assert(sql("desc formatted desc1").count() == 23)
   }
 
   test("test describe formatted for partition table") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 3995aa7..4723e6b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -26,11 +26,11 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
 import scala.util.Random
 
-import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.sql.{Row, RowFactory}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
 import org.apache.spark.sql.types.{MetadataBuilder, StringType}
@@ -896,4 +896,93 @@ object CommonUtil {
     CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
   }
 
+  /**
+   * This method will validate the cache level
+   *
+   * @param cacheLevel
+   * @param tableProperties
+   */
+  def validateCacheLevel(cacheLevel: String, tableProperties: Map[String, String]): Unit = {
+    val supportedCacheLevel = Seq("BLOCK", "BLOCKLET")
+    if (cacheLevel.trim.isEmpty) {
+      val errorMessage = "Invalid value: Empty column names for the option(s): " +
+                         CarbonCommonConstants.CACHE_LEVEL
+      throw new MalformedCarbonCommandException(errorMessage)
+    } else {
+      val trimmedCacheLevel = cacheLevel.trim.toUpperCase
+      if (!supportedCacheLevel.contains(trimmedCacheLevel)) {
+        val errorMessage = s"Invalid value: Allowed vaLues for ${
+          CarbonCommonConstants.CACHE_LEVEL} are BLOCK AND BLOCKLET"
+        throw new MalformedCarbonCommandException(errorMessage)
+      }
+      tableProperties.put(CarbonCommonConstants.CACHE_LEVEL, trimmedCacheLevel)
+    }
+  }
+
+  /**
+   * This will validate the column meta cache i.e the columns to be cached.
+   * By default all dimensions will be cached.
+   * If the property is already defined in create table DDL then validate it,
+   * else add all the dimension columns as columns to be cached to table properties.
+   * valid values for COLUMN_META_CACHE can either be empty or can have one or more comma
+   * separated values
+   */
+  def validateColumnMetaCacheFields(dbName: String,
+      tableName: String,
+      tableColumns: Seq[String],
+      cachedColumns: String,
+      tableProperties: Map[String, String]): Unit = {
+    val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+    // below check is added because empty value for column_meta_cache is allowed and in that
+    // case there should not be any validation
+    if (!cachedColumns.equals("")) {
+      validateColumnMetaCacheOption(tableIdentifier, dbName, cachedColumns, tableColumns)
+      val columnsToBeCached = cachedColumns.split(",").map(x => x.trim.toLowerCase).toSeq
+      // make the columns in create table order and then add it to table properties
+      val createOrder = tableColumns.filter(col => columnsToBeCached.contains(col))
+      tableProperties.put(CarbonCommonConstants.COLUMN_META_CACHE, createOrder.mkString(","))
+    }
+  }
+
+  /**
+   * Validate the column_meta_cache option in tableproperties
+   *
+   * @param tableIdentifier
+   * @param databaseName
+   * @param columnsToBeCached
+   * @param tableColumns
+   */
+  private def validateColumnMetaCacheOption(tableIdentifier: TableIdentifier,
+      databaseName: String,
+      columnsToBeCached: String,
+      tableColumns: Seq[String]): Unit = {
+    // check if only empty spaces are given in the property value
+    if (columnsToBeCached.trim.isEmpty) {
+      val errorMessage = "Invalid value: Empty column names for the option(s): " +
+                         CarbonCommonConstants.COLUMN_META_CACHE
+      throw new MalformedCarbonCommandException(errorMessage)
+    } else {
+      val columns: Array[String] = columnsToBeCached.split(',').map(x => x.toLowerCase.trim)
+      // Check for duplicate column names
+      columns.groupBy(col => col.toLowerCase).foreach(f => if (f._2.size > 1) {
+        throw new MalformedCarbonCommandException(s"Duplicate column name found : ${ f._1 }")
+      })
+      columns.foreach(col => {
+        // check if any intermediate column is empty
+        if (null == col || col.trim.isEmpty) {
+          val errorMessage = "Invalid value: Empty column names for the option(s): " +
+                             CarbonCommonConstants.COLUMN_META_CACHE
+          throw new MalformedCarbonCommandException(errorMessage)
+        }
+        // check if the column exists in the table
+        if (!tableColumns.contains(col.toLowerCase)) {
+          val errorMessage = s"Column $col does not exists in the table ${
+            databaseName
+          }.${ tableIdentifier.table }"
+          throw new MalformedCarbonCommandException(errorMessage)
+        }
+      })
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index e534f5f..13d1ff7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -381,7 +381,36 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
     // get partitionInfo
     val partitionInfo = getPartitionInfo(partitionCols, tableProperties)
-
+    if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
+      // validate the column_meta_cache option
+      val tableColumns = dims.map(x => x.name.get) ++ msrs.map(x => x.name.get)
+      CommonUtil.validateColumnMetaCacheFields(tableName,
+        dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
+        tableColumns,
+        tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get,
+        tableProperties)
+      val columnsToBeCached = tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get
+      if (columnsToBeCached.nonEmpty) {
+        columnsToBeCached.split(",").foreach { column =>
+          val dimFieldToBeCached = dims.filter(x => x.name.get.equals(column))
+          // first element is taken as each column with have a unique name
+          // check for complex type column
+          if (dimFieldToBeCached.nonEmpty &&
+              isComplexDimDictionaryExclude(dimFieldToBeCached(0).dataType.get)) {
+            val errorMessage =
+              s"$column is a complex type column and complex type is not allowed for " +
+              s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }"
+            throw new MalformedCarbonCommandException(errorMessage)
+          }
+        }
+      }
+    }
+    // validate the cache level
+    if (tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) {
+      CommonUtil.validateCacheLevel(
+        tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).get,
+        tableProperties)
+    }
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
     // validate table level properties for compaction

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 3b56a35..23b5cba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -108,6 +108,8 @@ private[sql] case class CarbonDescribeFormattedCommand(
     results ++= Seq(("SORT_SCOPE", tblProps.asScala.getOrElse("sort_scope", CarbonCommonConstants
       .LOAD_SORT_SCOPE_DEFAULT), tblProps.asScala.getOrElse("sort_scope", CarbonCommonConstants
       .LOAD_SORT_SCOPE_DEFAULT)))
+    // add Cache Level property
+    results ++= Seq(("CACHE_LEVEL", tblProps.getOrDefault("CACHE_LEVEL", "BLOCK"), ""))
     val isStreaming = tblProps.asScala.getOrElse("streaming", "false")
     results ++= Seq(("Streaming", isStreaming, ""))
     val isLocalDictEnabled = tblProps.asScala
@@ -187,6 +189,12 @@ private[sql] case class CarbonDescribeFormattedCommand(
     results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
       relation.carbonTable.getTableName).asScala
       .map(column => column).mkString(","), ""))
+    // add columns configured in column meta cache
+    if (null != tblProps.get(CarbonCommonConstants.COLUMN_META_CACHE)) {
+      results ++=
+      Seq(("COLUMN_META_CACHE", carbonTable.getCachedColumns(carbonTable.getTableName).asScala
+        .map(col => col).mkString(","), ""))
+    }
     if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
       results ++=
       Seq(("#Partition Information", "", ""),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0350e10/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index c291ae2..a6a730b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util
 
+import java.util
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
@@ -29,14 +31,18 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.CommonUtil
 
 object AlterTableUtil {
 
@@ -312,16 +318,22 @@ object AlterTableUtil {
     (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
     val tableName = tableIdentifier.table
     val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"Alter table properties request has been received for $dbName.$tableName")
+    LOGGER.audit(s"Alter table newProperties request has been received for $dbName.$tableName")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
     val timeStamp = 0L
-    var carbonTable: CarbonTable = null
     try {
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      val lowerCasePropertiesMap: mutable.Map[String, String] = mutable.Map.empty
+      // convert all the keys to lower case
+      properties.foreach { entry =>
+        lowerCasePropertiesMap.put(entry._1.toLowerCase, entry._2)
+      }
+      // validate the required cache level properties
+      validateColumnMetaCacheAndCacheLevel(carbonTable, lowerCasePropertiesMap)
       // get the latest carbon table
       // read the latest schema file
       val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
@@ -337,16 +349,19 @@ object AlterTableUtil {
         wrapperTableInfo, dbName, tableName)
       val tblPropertiesMap: mutable.Map[String, String] =
         thriftTable.fact_table.getTableProperties.asScala
+      // below map will be used for cache invalidation. As tblProperties map is getting modified
+      // in the next few steps the original map need to be retained for any decision making
+      val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*)
       if (set) {
-        //       This overrides old properties and update the comment parameter of thriftTable
-        //       with the newly added/modified comment since thriftTable also holds comment as its
-        //       direct property.
-        properties.foreach { property => if (validateTableProperties(property._1)) {
-          tblPropertiesMap.put(property._1.toLowerCase, property._2)
-        } else { val errorMessage = "Error: Invalid option(s): " + property._1.toString()
+        // This overrides old newProperties and update the comment parameter of thriftTable
+        // with the newly added/modified comment since thriftTable also holds comment as its
+        // direct property.
+        lowerCasePropertiesMap.foreach { property => if (validateTableProperties(property._1)) {
+          tblPropertiesMap.put(property._1, property._2)
+        } else {
+          val errorMessage = "Error: Invalid option(s): " + property._1.toString()
           throw new MalformedCarbonCommandException(errorMessage)
-        }
-        }
+        }}
       } else {
         // This removes the comment parameter from thriftTable
         // since thriftTable also holds comment as its property.
@@ -364,20 +379,232 @@ object AlterTableUtil {
         thriftTable)(sparkSession)
       catalog.alterTable(tableIdentifier, schemParts, cols)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
-      LOGGER.info(s"Alter table properties is successful for table $dbName.$tableName")
-      LOGGER.audit(s"Alter table properties is successful for table $dbName.$tableName")
+      // check and clear the block/blocklet cache
+      checkAndClearBlockletCache(carbonTable,
+        existingTablePropertiesMap,
+        lowerCasePropertiesMap,
+        propKeys,
+        set)
+      LOGGER.info(s"Alter table newProperties is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table newProperties is successful for table $dbName.$tableName")
     } catch {
       case e: Exception =>
-        LOGGER.error(e, "Alter table properties failed")
-        sys.error(s"Alter table properties operation failed: ${e.getMessage}")
+        LOGGER.error(e, "Alter table newProperties failed")
+        sys.error(s"Alter table newProperties operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks)
     }
   }
 
-  def validateTableProperties(propKey: String): Boolean = {
-    val supportedOptions = Seq("STREAMING", "COMMENT")
-   supportedOptions.contains(propKey.toUpperCase)
+  private def validateTableProperties(propKey: String): Boolean = {
+    val supportedOptions = Seq("STREAMING", "COMMENT", "COLUMN_META_CACHE", "CACHE_LEVEL")
+    supportedOptions.contains(propKey.toUpperCase)
+  }
+
+  /**
+   * validate column meta cache and cache level properties if configured by the user
+   *
+   * @param carbonTable
+   * @param propertiesMap
+   */
+  private def validateColumnMetaCacheAndCacheLevel(carbonTable: CarbonTable,
+      propertiesMap: mutable.Map[String, String]): Unit = {
+    // validate column meta cache property
+    if (propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
+      // Column meta cache is not allowed for child tables and dataMaps
+      if (carbonTable.isChildDataMap) {
+        throw new MalformedCarbonCommandException(s"Table property ${
+          CarbonCommonConstants.COLUMN_META_CACHE} is not allowed for child datamaps")
+      }
+      val schemaList: util.List[ColumnSchema] = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName),
+          carbonTable.getMeasureByTableName(carbonTable.getTableName))
+      val tableColumns: Seq[String] = schemaList.asScala
+        .map(columnSchema => columnSchema.getColumnName)
+      CommonUtil
+        .validateColumnMetaCacheFields(carbonTable.getDatabaseName,
+          carbonTable.getTableName,
+          tableColumns,
+          propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).get,
+          propertiesMap)
+      val columnsToBeCached = propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).get
+      validateForComplexTypeColumn(carbonTable, columnsToBeCached)
+    }
+    // validate cache level property
+    if (propertiesMap.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) {
+      // Cache level is not allowed for child tables and dataMaps
+      if (carbonTable.isChildDataMap) {
+        throw new MalformedCarbonCommandException(s"Table property ${
+          CarbonCommonConstants.CACHE_LEVEL} is not allowed for child datamaps")
+      }
+      CommonUtil.validateCacheLevel(
+        propertiesMap.get(CarbonCommonConstants.CACHE_LEVEL).get,
+        propertiesMap)
+    }
+  }
+
+  /**
+   * This method will validate if there is any complex type column in the columns to be cached
+   *
+   * @param carbonTable
+   * @param cachedColumns
+   */
+  private def validateForComplexTypeColumn(carbonTable: CarbonTable,
+      cachedColumns: String): Unit = {
+    if (cachedColumns.nonEmpty) {
+      cachedColumns.split(",").foreach { column =>
+        val dimension = carbonTable.getDimensionByName(carbonTable.getTableName, column)
+        if (null != dimension && dimension.isComplex) {
+          val errorMessage =
+            s"$column is a complex type column and complex type is not allowed for " +
+            s"the option(s): ${ CarbonCommonConstants.COLUMN_META_CACHE }"
+          throw new MalformedCarbonCommandException(errorMessage)
+        }
+      }
+    }
+  }
+
+  /**
+   * This method will check and clear the driver block/blocklet cache
+   *
+   * @param carbonTable
+   * @param existingTableProperties
+   * @param newProperties
+   * @param propKeys
+   * @param set
+   */
+  private def checkAndClearBlockletCache(carbonTable: CarbonTable,
+      existingTableProperties: scala.collection.mutable.Map[String, String],
+      newProperties: mutable.Map[String, String],
+      propKeys: Seq[String],
+      set: Boolean): Unit = {
+    if (set) {
+      clearBlockletCacheForCachingProperties(carbonTable, existingTableProperties, newProperties)
+    } else {
+      // convert all the unset keys to lower case
+      val propertiesToBeRemoved = propKeys.map(key => key.toLowerCase)
+      // This is unset scenario and the cache needs to be cleaned only when
+      // 1. column_meta_cache property is getting unset and existing table properties contains
+      // this property
+      // 2. cache_level property is being unset and existing table properties contains this property
+      // and the existing value is not equal to default value because after un-setting the property
+      // the cache should be loaded again with default value
+      if (propertiesToBeRemoved.contains(CarbonCommonConstants.COLUMN_META_CACHE) &&
+          existingTableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
+        ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+      } else if (propertiesToBeRemoved.contains(CarbonCommonConstants.CACHE_LEVEL)) {
+        val cacheLevel = existingTableProperties.get(CarbonCommonConstants.CACHE_LEVEL)
+        if (cacheLevel.isDefined &&
+            !cacheLevel.equals(CarbonCommonConstants.CACHE_LEVEL_DEFAULT_VALUE)) {
+          ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+        }
+      }
+    }
+  }
+
+  /**
+   * This method will validate the column_meta_cache and cache_level properties and clear the
+   * driver block/blocklet cache
+   *
+   * @param carbonTable
+   * @param tblPropertiesMap
+   * @param newProperties
+   */
+  private def clearBlockletCacheForCachingProperties(
+      carbonTable: CarbonTable,
+      tblPropertiesMap: scala.collection.mutable.Map[String, String],
+      newProperties: mutable.Map[String, String]): Unit = {
+    // check if column meta cache is defined. if defined then validate and clear the BTree cache
+    // if required
+    val columnMetaCacheProperty = newProperties.get(CarbonCommonConstants.COLUMN_META_CACHE)
+    columnMetaCacheProperty match {
+      case Some(newColumnsToBeCached) =>
+        if (!checkIfColumnsAreAlreadyCached(carbonTable, tblPropertiesMap
+          .get(CarbonCommonConstants.COLUMN_META_CACHE), newColumnsToBeCached)) {
+          ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+        }
+      case None =>
+      // don't do anything
+    }
+    // check if column meta cache is defined. if defined then validate and clear the BTree cache
+    // if required
+    val cacheLevelProperty = newProperties.get(CarbonCommonConstants.CACHE_LEVEL)
+    cacheLevelProperty match {
+      case Some(newCacheLevelValue) =>
+        if (!isCacheLevelValid(tblPropertiesMap.get(CarbonCommonConstants.CACHE_LEVEL),
+          newCacheLevelValue)) {
+          ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable)
+        }
+      case None =>
+      // don't do anything
+    }
+  }
+
+  /**
+   * Method to verify if the existing cache level is same as the new cache level
+   *
+   * @param existingCacheLevelValue
+   * @param newCacheLevelValue
+   * @return
+   */
+  private def isCacheLevelValid(existingCacheLevelValue: Option[String],
+      newCacheLevelValue: String): Boolean = {
+    existingCacheLevelValue match {
+      case Some(existingValue) =>
+        existingValue.equals(newCacheLevelValue)
+      case None =>
+        false
+    }
+  }
+
+  /**
+   * Check the new columns to be cached with the already cached columns. If count of new columns
+   * and already cached columns is same and all the new columns are already cached then
+   * false will be returned else true
+   *
+   * @param carbonTable
+   * @param existingCacheColumns
+   * @param newColumnsToBeCached
+   * @return
+   */
+  private def checkIfColumnsAreAlreadyCached(
+      carbonTable: CarbonTable,
+      existingCacheColumns: Option[String],
+      newColumnsToBeCached: String): Boolean = {
+    val newColumns = newColumnsToBeCached.split(",").map(x => x.trim.toLowerCase)
+    val isCached = existingCacheColumns match {
+      case Some(value) =>
+        val existingProperty = value.split(",").map(x => x.trim.toLowerCase)
+        compareColumns(existingProperty, newColumns)
+      case None =>
+        // By default all the columns in the table will be cached. This case is to compare all the
+        // table columns already cached to the newly specified cached columns
+        val schemaList: util.List[ColumnSchema] = CarbonUtil
+          .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName))
+        val tableColumns: Array[String] = schemaList.asScala
+          .map(columnSchema => columnSchema.getColumnName).toArray
+        compareColumns(tableColumns, newColumns)
+    }
+    isCached
   }
+
+  /**
+   * compare the existing cache columns and the new columns to be cached
+   *
+   * @param existingCachedColumns
+   * @param newColumnsToBeCached
+   * @return
+   */
+  private def compareColumns(existingCachedColumns: Array[String],
+      newColumnsToBeCached: Array[String]): Boolean = {
+    val allColumnsMatch = if (existingCachedColumns.length == newColumnsToBeCached.length) {
+      existingCachedColumns.filter(col => !newColumnsToBeCached.contains(col)).length == 0
+    } else {
+      false
+    }
+    allColumnsMatch
+  }
+
 }