You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/06/19 15:38:58 UTC

carbondata git commit: Support adding local dictionary configuration in create table statement and show the configs in describe formatted table

Repository: carbondata
Updated Branches:
  refs/heads/master ca466d9f4 -> be20fefbe


Support adding local dictionary configuration in create table statement and show the configs in describe formatted table

What changes were proposed in this pull request?
In this PR, in order to support local dictionary,

create table changes are made to support local dictionary configurations as table properties
show local dictionary properties in describe formatted command based on whether the local dictionary enabled or disabled.
Highlights:
basically we will have four properties

LOCAL_DICT_ENABLE => whether to enable or disable local dictionary
LOCAL_DICT_THRESHOLD => threshold property for the column to generate local dictionary
LOCAL_DICT_INCLUDE => columns for which local dictionary needs to be generated
LOCAL_DICT_EXCLUDE => columns for which local dictionary should not be generated

This closes#2375


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/be20fefb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/be20fefb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/be20fefb

Branch: refs/heads/master
Commit: be20fefbe0a01a501a567683ecc872e08f3fc001
Parents: ca466d9
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Jun 6 20:33:39 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Tue Jun 19 21:08:27 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  31 +++++
 .../ThriftWrapperSchemaConverterImpl.java       |   5 +
 .../core/metadata/schema/table/CarbonTable.java |  68 +++++++++
 .../schema/table/column/ColumnSchema.java       |  19 +++
 .../apache/carbondata/core/util/CarbonUtil.java | 108 ++++++++++++++
 .../describeTable/TestDescribeTable.scala       |   4 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 139 ++++++++++++++++++-
 .../command/carbonTableSchemaCommon.scala       |  13 +-
 .../table/CarbonDescribeFormattedCommand.scala  |  38 ++++-
 10 files changed, 417 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 355bcb6..5f06d08 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -910,6 +910,37 @@ public final class CarbonCommonConstants {
   public static final String COLUMN_GROUPS = "column_groups";
   public static final String DICTIONARY_EXCLUDE = "dictionary_exclude";
   public static final String DICTIONARY_INCLUDE = "dictionary_include";
+
+  /**
+   * Table property to enable or disable local dictionary generation
+   */
+  public static final String LOCAL_DICTIONARY_ENABLE = "local_dictionary_enable";
+
+  /**
+   * default value for local dictionary generation
+   */
+  public static final String LOCAL_DICTIONARY_ENABLE_DEFAULT = "true";
+
+  /**
+   * Threshold value for local dictionary
+   */
+  public static final String LOCAL_DICTIONARY_THRESHOLD = "local_dictionary_threshold";
+
+  /**
+   * default value for local dictionary
+   */
+  public static final String LOCAL_DICTIONARY_THRESHOLD_DEFAULT = "1000";
+
+  /**
+   * Table property to specify the columns for which local dictionary needs to be generated.
+   */
+  public static final String LOCAL_DICTIONARY_INCLUDE = "local_dictionary_include";
+
+  /**
+   * Table property to specify the columns for which local dictionary should not be to be generated.
+   */
+  public static final String LOCAL_DICTIONARY_EXCLUDE = "local_dictionary_exclude";
+
   /**
    * key for dictionary path
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index f03b997..12f5fc3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Thrift schema to carbon schema converter and vice versa
@@ -594,6 +595,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         .getTable_columns()) {
       listOfColumns.add(fromExternalToWrapperColumnSchema(externalColumnSchema));
     }
+    if (null != externalTableSchema.tableProperties) {
+      CarbonUtil
+          .setLocalDictColumnsToWrapperSchema(listOfColumns, externalTableSchema.tableProperties);
+    }
     wrapperTableSchema.setListOfColumns(listOfColumns);
     wrapperTableSchema.setSchemaEvolution(
         fromExternalToWrapperSchemaEvolution(externalTableSchema.getSchema_evolution()));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f48ada0..b7bef28 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -151,6 +151,16 @@ public class CarbonTable implements Serializable {
   private boolean hasDataMapSchema;
 
   /**
+   * is local dictionary generation enabled for the table
+   */
+  private boolean isLocalDictionaryEnabled;
+
+  /**
+   * local dictionary generation threshold
+   */
+  private int localDictionaryThreshold;
+
+  /**
    * The boolean field which points if the data written for Non Transactional Table
    * or Transactional Table.
    * transactional table means carbon will provide transactional support when user doing data
@@ -468,6 +478,37 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * is local dictionary enabled for the table
+   * @return
+   */
+  public boolean isLocalDictionaryEnabled() {
+    return isLocalDictionaryEnabled;
+  }
+
+  /**
+   * set whether local dictionary enabled or not
+   * @param localDictionaryEnabled
+   */
+  public void setLocalDictionaryEnabled(boolean localDictionaryEnabled) {
+    isLocalDictionaryEnabled = localDictionaryEnabled;
+  }
+
+  /**
+   * @return local dictionary generation threshold
+   */
+  public int getLocalDictionaryThreshold() {
+    return localDictionaryThreshold;
+  }
+
+  /**
+   * set the local dictionary generation threshold
+   * @param localDictionaryThreshold
+   */
+  public void setLocalDictionaryThreshold(int localDictionaryThreshold) {
+    this.localDictionaryThreshold = localDictionaryThreshold;
+  }
+
+  /**
    * build table unique name
    * all should call this method to build table unique name
    * @param databaseName
@@ -1045,5 +1086,32 @@ public class CarbonTable implements Serializable {
     }
     table.hasDataMapSchema =
         null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0;
+    setLocalDictInfo(table, tableInfo);
+  }
+
+  /**
+   * This method sets whether the local dictionary is enabled or not, and the local dictionary
+   * threshold, if not defined default value are considered.
+   * @param table
+   * @param tableInfo
+   */
+  private static void setLocalDictInfo(CarbonTable table, TableInfo tableInfo) {
+    String isLocalDictionaryEnabled = tableInfo.getFactTable().getTableProperties()
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE);
+    String localDictionaryThreshold = tableInfo.getFactTable().getTableProperties()
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD);
+    if (null != isLocalDictionaryEnabled) {
+      table.setLocalDictionaryEnabled(Boolean.parseBoolean(isLocalDictionaryEnabled));
+      if (null != localDictionaryThreshold) {
+        table.setLocalDictionaryThreshold(Integer.parseInt(localDictionaryThreshold));
+      } else {
+        table.setLocalDictionaryThreshold(
+            Integer.parseInt(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT));
+      }
+    } else {
+      // in case of old tables, local dictionary enable property will not be present in
+      // tableProperties, so disable the local dictionary generation
+      table.setLocalDictionaryEnabled(Boolean.parseBoolean("false"));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index fb4d8e3..786e873 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -138,6 +138,25 @@ public class ColumnSchema implements Serializable, Writable {
   private String timeSeriesFunction = "";
 
   /**
+   * set whether the column is local dictionary column or not.
+   */
+  private boolean isLocalDictColumn = false;
+
+  /**
+   * @return isLocalDictColumn
+   */
+  public boolean isLocalDictColumn() {
+    return isLocalDictColumn;
+  }
+
+  /**
+   * @param localDictColumn whether column is local dictionary column
+   */
+  public void setLocalDictColumn(boolean localDictColumn) {
+    isLocalDictColumn = localDictColumn;
+  }
+
+  /**
    * @return the columnName
    */
   public String getColumnName() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 836b193..2f34163 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3004,5 +3004,113 @@ public final class CarbonUtil {
     }
     return blockId;
   }
+
+  /**
+   * sets the local dictionary columns to wrapper schema, if the table property
+   * local_dictionary_include is defined, then those columns will be set as local dictionary
+   * columns, if not, all the no dictionary string datatype columns are set as local dictionary
+   * columns.
+   * Handling for complexTypes::
+   *    Since the column structure will be flat
+   *    if the parent column is configured as local Dictionary column, then it gets the child column
+   *    count and then sets the primitive child column as local dictionary column if it is string
+   *    datatype column
+   * Handling for both localDictionary Include and exclude columns:
+   * There will be basically four scenarios which are
+   * -------------------------------------------------------
+   * | Local_Dictionary_include | Local_Dictionary_Exclude |
+   * -------------------------------------------------------
+   * |   Not Defined            |     Not Defined          |
+   * |   Not Defined            |      Defined             |
+   * |   Defined                |     Not Defined          |
+   * |   Defined                |      Defined             |
+   * -------------------------------------------------------
+   * 1. when the both local dictionary include and exclude is not defined, then set all the no
+   * dictionary string datatype columns as local dictionary generate columns
+   * 2. set all the no dictionary string datatype columns as local dictionary columns except the
+   * columns present in local dictionary exclude
+   * 3. & 4. when local dictionary include is defined, no need to check dictionary exclude columns
+   * configured or not, we just need to set only the columns present in local dictionary include as
+   * local dictionary columns
+   *
+   * @param columns
+   * @param mainTableProperties
+   */
+  public static void setLocalDictColumnsToWrapperSchema(List<ColumnSchema> columns,
+      Map<String, String> mainTableProperties) {
+    String isLocalDictEnabledForMainTable =
+        mainTableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE);
+    String localDictIncludeColumnsOfMainTable =
+        mainTableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE);
+    String localDictExcludeColumnsOfMainTable =
+        mainTableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE);
+    String[] listOfDictionaryIncludeColumns = null;
+    String[] listOfDictionaryExcludeColumns = null;
+    if (null != isLocalDictEnabledForMainTable && Boolean
+        .parseBoolean(isLocalDictEnabledForMainTable)) {
+      int childColumnCount = 0;
+      for (ColumnSchema column : columns) {
+        // for complex type columns, user gives the parent column as local dictionary column and
+        // only the string primitive type child column will be set as local dictionary column in the
+        // schema
+        if (childColumnCount > 0) {
+          if (column.getDataType().equals(DataTypes.STRING)) {
+            column.setLocalDictColumn(true);
+            childColumnCount -= 1;
+          } else {
+            childColumnCount -= 1;
+          }
+        }
+        // if complex column is defined in local dictionary include column, then get the child
+        // columns and set the string datatype child type as local dictionary column
+        if (column.getNumberOfChild() > 0 && null != localDictIncludeColumnsOfMainTable) {
+          listOfDictionaryIncludeColumns = localDictIncludeColumnsOfMainTable.split(",");
+          for (String dictColumn : listOfDictionaryIncludeColumns) {
+            if (dictColumn.trim().equalsIgnoreCase(column.getColumnName())) {
+              childColumnCount = column.getNumberOfChild();
+            }
+          }
+        }
+        if (null == localDictIncludeColumnsOfMainTable) {
+          // if local dictionary exclude columns is not defined, then set all the no dictionary
+          // string datatype column
+          if (null == localDictExcludeColumnsOfMainTable) {
+            // column should be no dictionary string datatype column
+            if (column.isDimensionColumn() && column.getDataType().equals(DataTypes.STRING)
+                && !column.hasEncoding(Encoding.DICTIONARY)) {
+              column.setLocalDictColumn(true);
+            }
+            // if local dictionary exclude columns is defined, then set for all no dictionary string
+            // datatype columns except excluded columns
+          } else {
+            if (column.isDimensionColumn() && column.getDataType().equals(DataTypes.STRING)
+                && !column.hasEncoding(Encoding.DICTIONARY)) {
+              listOfDictionaryExcludeColumns = localDictExcludeColumnsOfMainTable.split(",");
+              for (String excludeDictColumn : listOfDictionaryExcludeColumns) {
+                if (!excludeDictColumn.trim().equalsIgnoreCase(column.getColumnName())) {
+                  column.setLocalDictColumn(true);
+                }
+              }
+            }
+          }
+        } else {
+          // if local dict columns alre not configured, set for all no dictionary string datatype
+          // column
+          if (column.isDimensionColumn() && column.getDataType().equals(DataTypes.STRING) && !column
+              .hasEncoding(Encoding.DICTIONARY) && localDictIncludeColumnsOfMainTable.toLowerCase()
+              .contains(column.getColumnName().toLowerCase())) {
+            if (null == listOfDictionaryIncludeColumns) {
+              listOfDictionaryIncludeColumns = localDictIncludeColumnsOfMainTable.split(",");
+            }
+            for (String dictColumn : listOfDictionaryIncludeColumns) {
+              if (dictColumn.trim().equalsIgnoreCase(column.getColumnName())) {
+                column.setLocalDictColumn(true);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
index 1e333ee..5598457 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala
@@ -51,10 +51,10 @@ class TestDescribeTable extends QueryTest with BeforeAndAfterAll {
   test("test describe formatted table desc1") {
 
     val resultCol = Seq("", "", "##Detailed Column property", "##Detailed Table Information", "ADAPTIVE", "CARBON Store Path", "Comment", "Database Name", "Last Update Time",
-    "SORT_COLUMNS", "SORT_SCOPE", "Streaming", "Table Block Size", "Table Data Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4")
+    "SORT_COLUMNS", "SORT_SCOPE", "Streaming", "Table Block Size", "Local Dictionary Enabled", "Local Dictionary Threshold","Table Data Size", "Table Index Size", "Table Name", "dec2col1", "dec2col2", "dec2col3", "dec2col4")
     val resultRow: Seq[Row] = resultCol map(propName => Row(f"$propName%-36s"))
     checkAnswer(sql("desc formatted DESC1").select("col_name"), resultRow)
-    assert(sql("desc formatted desc1").count() == 20)
+    assert(sql("desc formatted desc1").count() == 22)
   }
 
   test("test describe formatted for partition table") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index b43ae3f..1ccbf6a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -44,7 +44,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionary
 import org.apache.carbondata.core.metadata.ColumnIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, DataMapSchemaStorageProvider}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema}
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.processing.exception.DataLoadingException

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 7d28790..65ff76d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, Map}
 import scala.language.implicitConversions
+import scala.util.Try
 import scala.util.matching.Regex
 
 import org.apache.hadoop.hive.ql.lib.Node
@@ -43,7 +44,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataTypeConverterUtil}
 
 /**
  * TODO remove the duplicate code and add the common methods to common class.
@@ -292,6 +293,83 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         s"${CarbonCommonConstants.COLUMN_GROUPS} is deprecated")
     }
 
+    // validate the local dictionary property if defined
+    if (tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined) {
+      Try(tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).toBoolean) match {
+        case scala.util.Success(value) =>
+        case scala.util.Failure(ex) =>
+          tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+            CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+      }
+    } else {
+      // if LOCAL_DICTIONARY_ENABLE is not defined, consider the default value which is true
+      tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+        CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+    }
+
+    // validate the local dictionary threshold property if defined
+    if (tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD).isDefined) {
+      // if any invalid value is configured for LOCAL_DICTIONARY_THRESHOLD, then default value
+      // will be
+      // considered which is 1000
+      Try(tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD).toInt) match {
+        case scala.util.Success(value) =>
+          if (value <= 0) {
+            tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+              CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
+          }
+        case scala.util.Failure(ex) =>
+          LOGGER
+            .debug(
+              "invalid value is configured for local_dictionary_threshold, considering the defaut" +
+              " " +
+              "value")
+          tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+            CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
+      }
+    }
+
+    // validate the local dictionary columns defined, this we will validated if the local dictionary
+    // is enabled, else it is not validated
+    if (!(tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined &&
+          tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).trim
+            .equalsIgnoreCase("false"))) {
+      var localDictIncludeColumns: Seq[String] = Seq[String]()
+      var localDictExcludeColumns: Seq[String] = Seq[String]()
+      val isLocalDictIncludeDefined = tableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
+        .isDefined
+      val isLocalDictExcludeDefined = tableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
+        .isDefined
+      if (isLocalDictIncludeDefined) {
+        localDictIncludeColumns =
+          tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE).split(",").map(_.trim)
+        // validate all the local dictionary include columns
+        validateLocalDictionaryColumns(fields, tableProperties, localDictIncludeColumns)
+      }
+      if (isLocalDictExcludeDefined) {
+        localDictExcludeColumns =
+          tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).split(",").map(_.trim)
+        // validate all the local dictionary exclude columns
+        validateLocalDictionaryColumns(fields, tableProperties, localDictExcludeColumns)
+      }
+      // validate if both local dictionary include and exclude contains same column
+      if (isLocalDictIncludeDefined && isLocalDictExcludeDefined) {
+        val localDictIncludeCols = tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
+        val localDictExcludeCols = tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
+        if (List(localDictIncludeCols, localDictExcludeCols).mkString(",").split(",")
+              .distinct.length !=
+            List(localDictIncludeCols, localDictExcludeCols).mkString(",").split(",")
+              .length) {
+          val errMsg =
+            "Column ambiguity as duplicate columns present in LOCAL_DICTIONARY_INCLUDE and " +
+            "LOCAL_DICTIONARY_INCLUDE.Duplicate columns are not allowed."
+          throw new MalformedCarbonCommandException(errMsg)
+        }
+      }
+    }
+
     // get no inverted index columns from table properties.
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
     // get partitionInfo
@@ -322,6 +400,65 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   }
 
   /**
+   * This method validates the local dictionary configured columns
+   *
+   * @param fields
+   * @param tableProperties
+   */
+  private def validateLocalDictionaryColumns(fields: Seq[Field],
+      tableProperties: Map[String, String], localDictColumns: Seq[String]): Unit = {
+    var dictIncludeColumns: Seq[String] = Seq[String]()
+
+    // check if the duplicate columns are specified in table schema
+    if (localDictColumns.distinct.lengthCompare(localDictColumns.size) != 0) {
+      val a = localDictColumns.diff(localDictColumns.distinct).distinct
+      val errMsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE contains Duplicate Columns " +
+                   a.mkString("(", ",", ")") +
+                   ". Please check create table statement."
+      throw new MalformedCarbonCommandException(errMsg)
+    }
+
+    // check if the column specified exists in table schema
+    localDictColumns.foreach { distCol =>
+      if (!fields.exists(x => x.column.equalsIgnoreCase(distCol.trim))) {
+        val errormsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: " + distCol.trim +
+                       " does not exist in table. Please check create table statement."
+        throw new MalformedCarbonCommandException(errormsg)
+      }
+    }
+
+    // check if column is other than string datatype
+    localDictColumns.foreach { dictColm =>
+      if (fields
+        .exists(x => x.column.equalsIgnoreCase(dictColm) &&
+                     !x.dataType.get.equalsIgnoreCase("STRING") &&
+                     !x.dataType.get.equalsIgnoreCase("STRUCT") &&
+                     !x.dataType.get.equalsIgnoreCase("ARRAY"))) {
+        val errormsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: " +
+                       dictColm.trim +
+                       " is not a String datatype column. LOCAL_DICTIONARY_COLUMN should be no " +
+                       "dictionary string datatype column.Please check create table statement."
+        throw new MalformedCarbonCommandException(errormsg)
+      }
+    }
+    // check if the same column is present in both dictionary include and local dictionary columns
+    // configuration
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+      dictIncludeColumns =
+        tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(",").map(_.trim)
+      localDictColumns.foreach { distCol =>
+        if (dictIncludeColumns.exists(x => x.equalsIgnoreCase(distCol.trim))) {
+          val errormsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: " +
+                         distCol.trim +
+                         " specified in Dictionary include. Local Dictionary will not be " +
+                         "generated for Dictionary include. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        }
+      }
+    }
+  }
+
+  /**
    * Extract the column groups configuration from table properties.
    * Based on this Row groups of fields will be determined.
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index aa40a1f..d48db21 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.util.CarbonException
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -40,7 +39,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CompactionType
@@ -246,7 +245,6 @@ class AlterTableColumnSchemaGenerator(
       allColumns ++= Seq(columnSchema)
       newCols ++= Seq(columnSchema)
     })
-
     allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
     alterTableModel.msrCols.foreach(field => {
       val encoders = new java.util.ArrayList[Encoding]()
@@ -294,7 +292,9 @@ class AlterTableColumnSchemaGenerator(
     alterTableModel.tableProperties.foreach {
       x => val value = tablePropertiesMap.get(x._1)
         if (null != value) {
-          tablePropertiesMap.put(x._1, value + "," + x._2)
+          if (value != x._2) {
+            tablePropertiesMap.put(x._1, value + "," + x._2)
+          }
         } else {
           tablePropertiesMap.put(x._1, x._2)
         }
@@ -550,6 +550,11 @@ class TableNewProcessor(cm: TableModel) {
       }
     }
 
+    // check whether the column is a local dictionary column and set in column schema
+    if (null != cm.tableProperties) {
+      CarbonUtil
+        .setLocalDictColumnsToWrapperSchema(allColumns.asJava, cm.tableProperties.asJava)
+    }
     cm.msrCols.foreach { field =>
       // if aggregate function is defined in case of preaggregate and agg function is sum or avg
       // then it can be stored as measure

http://git-wip-us.apache.org/repos/asf/carbondata/blob/be20fefb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 617f5e8..c6bd567 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -30,8 +30,7 @@ import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 
 private[sql] case class CarbonDescribeFormattedCommand(
     child: SparkPlan,
@@ -111,6 +110,41 @@ private[sql] case class CarbonDescribeFormattedCommand(
       .LOAD_SORT_SCOPE_DEFAULT)))
     val isStreaming = tblProps.asScala.getOrElse("streaming", "false")
     results ++= Seq(("Streaming", isStreaming, ""))
+    val isLocalDictEnabled = tblProps.asScala
+      .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+          CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+    results ++= Seq(("Local Dictionary Enabled", isLocalDictEnabled, ""))
+    // if local dictionary is enabled, then only show other properties of local dictionary
+    if (isLocalDictEnabled.toBoolean) {
+      val localDictThreshold = tblProps.asScala
+        .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+          CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
+      results ++= Seq(("Local Dictionary Threshold", localDictThreshold, ""))
+      if (tblProps.asScala
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE).isDefined) {
+        val allLocalDictColumns = tblProps.asScala(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
+          .split(",")
+        results ++= Seq(("Local Dictionary Include", getDictColumnString(allLocalDictColumns), ""))
+      }
+      if (tblProps.asScala
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).isDefined) {
+        val allLocalDictColumns = tblProps.asScala(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
+          .split(",")
+        results ++= Seq(("Local Dictionary Exclude", getDictColumnString(allLocalDictColumns), ""))
+      }
+    }
+
+    /**
+     * return the string which has all comma separated columns
+     * @param localDictColumns
+     * @return
+     */
+    def getDictColumnString(localDictColumns: Array[String]): String = {
+      val dictColumns: StringBuilder = new StringBuilder
+      localDictColumns.foreach(column => dictColumns.append(column).append(","))
+      dictColumns.toString().patch(dictColumns.toString().lastIndexOf(","), "", 1)
+    }
+
 
     // show table level compaction options
     if (tblProps.containsKey(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE)) {