You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/03/16 09:28:55 UTC

[12/14] incubator-carbondata git commit: Restructure requirement related spark-integration changes

Restructure requirement related spark-integration changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/44bb6f1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/44bb6f1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/44bb6f1e

Branch: refs/heads/master
Commit: 44bb6f1e02f6f62739cdbc111bcb31f8860a09ea
Parents: 70256e7
Author: nareshpr <pr...@gmail.com>
Authored: Fri Mar 10 11:30:51 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Mar 16 14:50:43 2017 +0530

----------------------------------------------------------------------
 .../spark/merger/CarbonCompactionExecutor.java  |  36 ++-
 .../spark/merger/CarbonCompactionUtil.java      |  79 +++++
 .../spark/merger/RowResultMerger.java           |   5 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  44 +--
 .../spark/rdd/DataManagementFunc.scala          |   2 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala | 100 ++++++
 .../carbondata/spark/util/CommonUtil.scala      |   7 +
 .../spark/util/DataTypeConverterUtil.scala      |  36 +++
 .../spark/util/GlobalDictionaryUtil.scala       | 113 ++++++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  55 +++-
 .../execution/command/carbonTableSchema.scala   | 168 +++++++++-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   6 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  34 +-
 .../sql/execution/command/DDLStrategy.scala     |  27 ++
 .../execution/command/carbonTableSchema.scala   | 310 ++++++++++++++++++-
 .../apache/spark/sql/hive/CarbonMetastore.scala | 121 ++++++--
 .../sql/parser/CarbonSpark2SqlParser.scala      |  83 ++++-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   7 +-
 .../AlterTableValidationTestCase.scala          | 111 +++++++
 19 files changed, 1248 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
index 2f84ade..4458457 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
@@ -92,10 +93,15 @@ public class CarbonCompactionExecutor {
       String segmentId = taskMap.getKey();
       List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
 
-      int[] colCardinality = listMetadata.get(0).getSegmentInfo().getColumnCardinality();
-
+      List<ColumnSchema> updatedColumnSchemaList = CarbonUtil
+          .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
+              carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+      int[] updatedColumnCardinalities = CarbonUtil
+          .getUpdatedColumnCardinalities(listMetadata.get(0).getColumnInTable(),
+              carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
+              listMetadata.get(0).getSegmentInfo().getColumnCardinality());
       SegmentProperties sourceSegProperties =
-          new SegmentProperties(listMetadata.get(0).getColumnInTable(), colCardinality);
+          new SegmentProperties(updatedColumnSchemaList, updatedColumnCardinalities);
 
       // for each segment get taskblock info
       TaskBlockInfo taskBlockInfo = taskMap.getValue();
@@ -171,16 +177,28 @@ public class CarbonCompactionExecutor {
 
     List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
-    for (CarbonDimension dim : destinationSegProperties.getDimensions()) {
-      QueryDimension queryDimension = new QueryDimension(dim.getColName());
-      dims.add(queryDimension);
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    for (CarbonDimension dim : dimensions) {
+      // check if dimension is deleted
+      if (!dim.isInvisible()) {
+        QueryDimension queryDimension = new QueryDimension(dim.getColName());
+        queryDimension.setDimension(dim);
+        dims.add(queryDimension);
+      }
     }
     model.setQueryDimension(dims);
 
     List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (CarbonMeasure carbonMeasure : destinationSegProperties.getMeasures()) {
-      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
-      msrs.add(queryMeasure);
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+    for (CarbonMeasure carbonMeasure : measures) {
+      // check if measure is deleted
+      if (!carbonMeasure.isInvisible()) {
+        QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
+        queryMeasure.setMeasure(carbonMeasure);
+        msrs.add(queryMeasure);
+      }
     }
     model.setQueryMeasures(msrs);
     model.setQueryId(System.nanoTime() + "");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
index ed19b27..f63778d 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
@@ -30,10 +30,15 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.commons.lang3.ArrayUtils;
+
 /**
  * Utility Class for the Compaction Flow.
  */
@@ -267,4 +272,78 @@ public class CarbonCompactionUtil {
     }
     return null;
   }
+
+  /**
+   * This method will add the prepare the max column cardinality map
+   *
+   * @param columnCardinalityMap
+   * @param currentBlockSchema
+   * @param currentBlockCardinality
+   */
+  public static void addColumnCardinalityToMap(Map<String, Integer> columnCardinalityMap,
+      List<ColumnSchema> currentBlockSchema, int[] currentBlockCardinality) {
+    for (int i = 0; i < currentBlockCardinality.length; i++) {
+      // add value to map only if does not exist or new cardinality is > existing value
+      String columnUniqueId = currentBlockSchema.get(i).getColumnUniqueId();
+      Integer value = columnCardinalityMap.get(columnUniqueId);
+      if (null == value) {
+        columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
+      } else {
+        if (currentBlockCardinality[i] > value) {
+          columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
+        }
+      }
+    }
+  }
+
+  /**
+   * This method will return the updated cardinality according to the master schema
+   *
+   * @param columnCardinalityMap
+   * @param carbonTable
+   * @param updatedColumnSchemaList
+   * @return
+   */
+  public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap,
+      CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) {
+    List<CarbonDimension> masterDimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size());
+    for (CarbonDimension dimension : masterDimensions) {
+      if (!dimension.isInvisible()) {
+        Integer value = columnCardinalityMap.get(dimension.getColumnId());
+        if (null == value) {
+          updatedCardinalityList.add(getDimensionDefaultCardinality(dimension));
+        } else {
+          updatedCardinalityList.add(value);
+        }
+        updatedColumnSchemaList.add(dimension.getColumnSchema());
+      }
+    }
+    int[] updatedCardinality = ArrayUtils
+        .toPrimitive(updatedCardinalityList.toArray(new Integer[updatedCardinalityList.size()]));
+    return updatedCardinality;
+  }
+
+  /**
+   * This method will return the default cardinality based on dimension type
+   *
+   * @param dimension
+   * @return
+   */
+  private static int getDimensionDefaultCardinality(CarbonDimension dimension) {
+    int cardinality = 0;
+    if (dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+      cardinality = Integer.MAX_VALUE;
+    } else if (dimension.hasEncoding(Encoding.DICTIONARY)) {
+      if (null != dimension.getDefaultValue()) {
+        cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1;
+      } else {
+        cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY;
+      }
+    } else {
+      cardinality = -1;
+    }
+    return cardinality;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
index 089cd0e..91a5c03 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
@@ -79,7 +79,7 @@ public class RowResultMerger {
 
   public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName,
       String tableName, SegmentProperties segProp, String tempStoreLocation,
-      CarbonLoadModel loadModel, int[] colCardinality, CompactionType compactionType) {
+      CarbonLoadModel loadModel, CompactionType compactionType) {
 
     CarbonDataFileAttributes carbonDataFileAttributes;
 
@@ -131,7 +131,7 @@ public class RowResultMerger {
     } else {
       carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
     }
-    carbonFactDataHandlerModel.setColCardinality(colCardinality);
+    carbonFactDataHandlerModel.setColCardinality(segProp.getDimColumnsCardinality());
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
 
@@ -202,6 +202,7 @@ public class RowResultMerger {
       }
       mergeStatus = true;
     } catch (Exception e) {
+      LOGGER.error(e, e.getMessage());
       LOGGER.error("Exception in compaction merger " + e.getMessage());
       mergeStatus = false;
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 7a506ba..51f9022 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -139,6 +139,7 @@ class CarbonMergerRDD[K, V](
             .toList
         }
 
+        val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
         // get destination segment properties as sent from driver which is of last segment.
         val segmentProperties = new SegmentProperties(
           carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
@@ -153,7 +154,7 @@ class CarbonMergerRDD[K, V](
         carbonLoadModel.setStorePath(hdfsStoreLocation)
 
         exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
-          carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, dataFileMetadataSegMapping)
+          carbonTable, dataFileMetadataSegMapping)
 
         // fire a query and get the results.
         var result2: java.util.List[RawResultIterator] = null
@@ -196,7 +197,6 @@ class CarbonMergerRDD[K, V](
             segmentProperties,
             tempStoreLoc,
             carbonLoadModel,
-            carbonMergerMapping.maxSegmentColCardinality,
             carbonMergerMapping.campactionType
           )
         mergeStatus = merger.mergerSlice()
@@ -237,21 +237,6 @@ class CarbonMergerRDD[K, V](
     iter
   }
 
-
-  def calculateCardanility(targetCardinality: Array[Int],
-      sourceCardinality: Array[Int],
-      columnSize: Int): Unit = {
-    var cols = columnSize
-
-    // Choose the highest cardinality among all the blocks.
-    while (cols > 0) {
-      if (targetCardinality(cols - 1) < sourceCardinality(cols - 1)) {
-        targetCardinality(cols - 1) = sourceCardinality(cols - 1)
-      }
-      cols -= 1
-    }
-  }
-
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
@@ -319,13 +304,9 @@ class CarbonMergerRDD[K, V](
           logError("Exception in preparing the data file footer for compaction " + e.getMessage)
           throw e
       }
-
-      columnSize = dataFileFooter.getSegmentInfo.getColumnCardinality.size
-      carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
-        .toList
     }
 
-    var cardinality = new Array[Int](columnSize)
+    val columnToCardinalityMap = new util.HashMap[java.lang.String, Integer]()
 
     carbonInputSplits.foreach(splits => {
       val taskNo = splits.taskId
@@ -350,14 +331,21 @@ class CarbonMergerRDD[K, V](
           logError("Exception in preparing the data file footer for compaction " + e.getMessage)
           throw e
       }
-
-      // Calculate the Cardinality of the new segment
-      calculateCardanility(cardinality,
-        dataFileFooter.getSegmentInfo.getColumnCardinality,
-        columnSize)
+      // add all the column and cardinality to the map
+      CarbonCompactionUtil
+        .addColumnCardinalityToMap(columnToCardinalityMap,
+          dataFileFooter.getColumnInTable,
+          dataFileFooter.getSegmentInfo.getColumnCardinality)
     }
     )
-
+    val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]()
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // update cardinality and column schema list according to master schema
+    val cardinality = CarbonCompactionUtil
+      .updateColumnSchemaAndGetCardinality(columnToCardinalityMap,
+        carbonTable,
+        updatedMaxSegmentColumnList)
+    carbonMergerMapping.maxSegmentColumnSchemaList = updatedMaxSegmentColumnList.asScala.toList
     // Set cardinality for new segment.
     carbonMergerMapping.maxSegmentColCardinality = cardinality
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index d0b3e29..3b3bac3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -194,7 +194,7 @@ object DataManagementFunc {
         )
       } catch {
         case e: Exception =>
-          LOGGER.error(s"Exception in compaction thread ${ e.getMessage }")
+          LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
           throw e
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 428df48..8580691 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -21,12 +21,14 @@ import java.io.File
 import java.text.SimpleDateFormat
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.command.DataTypeInfo
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType}
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.util.CarbonProperties
 
 object CarbonScalaUtil {
@@ -194,4 +196,102 @@ object CarbonScalaUtil {
       }
     }
   }
+
+  /**
+   * This method will validate a column for its data type and check whether the column data type
+   * can be modified and update if conditions are met
+   *
+   * @param dataTypeInfo
+   * @param carbonColumn
+   */
+  def validateColumnDataType(dataTypeInfo: DataTypeInfo, carbonColumn: CarbonColumn): Unit = {
+    carbonColumn.getDataType.getName match {
+      case "INT" =>
+        if (!dataTypeInfo.dataType.equals("bigint")) {
+          sys
+            .error(s"Given column ${ carbonColumn.getColName } with data type ${
+              carbonColumn
+                .getDataType.getName
+            } cannot be modified. Int can only be changed to bigInt")
+        }
+      case "DECIMAL" =>
+        if (!dataTypeInfo.dataType.equals("decimal")) {
+          sys
+            .error(s"Given column ${ carbonColumn.getColName } with data type ${
+              carbonColumn.getDataType.getName
+            } cannot be modified. Decimal can be only be changed to Decimal of higher precision")
+        }
+        if (dataTypeInfo.precision <= carbonColumn.getColumnSchema.getPrecision) {
+          sys
+            .error(s"Given column ${
+              carbonColumn
+                .getColName
+            } cannot be modified. Specified precision value ${
+              dataTypeInfo
+                .precision
+            } should be greater or equal to current precision value ${
+              carbonColumn.getColumnSchema
+                .getPrecision
+            }")
+        } else if (dataTypeInfo.scale <= carbonColumn.getColumnSchema.getScale) {
+          sys
+            .error(s"Given column ${
+              carbonColumn
+                .getColName
+            } cannot be modified. Specified scale value ${
+              dataTypeInfo
+                .scale
+            } should be greater or equal to current scale value ${
+              carbonColumn.getColumnSchema
+                .getScale
+            }")
+        } else {
+          // difference of precision and scale specified by user should not be less than the
+          // difference of already existing precision and scale else it will result in data loss
+          val carbonColumnPrecisionScaleDiff = carbonColumn.getColumnSchema.getPrecision -
+                                               carbonColumn.getColumnSchema.getScale
+          val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - dataTypeInfo.scale
+          if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) {
+            sys
+              .error(s"Given column ${
+                carbonColumn
+                  .getColName
+              } cannot be modified. Specified precision and scale values will lead to data loss")
+          }
+        }
+      case _ =>
+        sys
+          .error(s"Given column ${ carbonColumn.getColName } with data type ${
+            carbonColumn
+              .getDataType.getName
+          } cannot be modified. Only Int and Decimal data types are allowed for modification")
+    }
+  }
+
+  /**
+   * This method will create a copy of the same object
+   *
+   * @param thriftColumnSchema object to be cloned
+   * @return
+   */
+  def createColumnSchemaCopyObject(thriftColumnSchema: org.apache.carbondata.format.ColumnSchema)
+  : org.apache.carbondata.format.ColumnSchema = {
+    val columnSchema = new org.apache.carbondata.format.ColumnSchema
+    columnSchema.column_group_id = thriftColumnSchema.column_group_id
+    columnSchema.column_name = thriftColumnSchema.column_name
+    columnSchema.columnProperties = thriftColumnSchema.columnProperties
+    columnSchema.columnReferenceId = thriftColumnSchema.columnReferenceId
+    columnSchema.column_id = thriftColumnSchema.column_id
+    columnSchema.data_type = thriftColumnSchema.data_type
+    columnSchema.default_value = thriftColumnSchema.default_value
+    columnSchema.encoders = thriftColumnSchema.encoders
+    columnSchema.invisible = thriftColumnSchema.invisible
+    columnSchema.columnar = thriftColumnSchema.columnar
+    columnSchema.dimension = thriftColumnSchema.dimension
+    columnSchema.num_child = thriftColumnSchema.num_child
+    columnSchema.precision = thriftColumnSchema.precision
+    columnSchema.scale = thriftColumnSchema.scale
+    columnSchema.schemaOrdinal = thriftColumnSchema.schemaOrdinal
+    columnSchema
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 7592e4e..cf88b8c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -345,4 +345,11 @@ object CommonUtil {
 
     csvColumns
   }
+
+  def validateColumnNames(columnName: String, columnNameCopy: String): Unit = {
+    if (!columnName.equalsIgnoreCase(columnNameCopy)) {
+      throw new MalformedCarbonCommandException(
+        "Column names provided are different. Both the column names should be same")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 85bbb93..475650f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -82,4 +82,40 @@ object DataTypeConverterUtil {
       case DataType.STRUCT => "struct"
     }
   }
+
+  /**
+   * convert from wrapper to external data type
+   *
+   * @param dataType
+   * @return
+   */
+  def convertToThriftDataType(dataType: String): org.apache.carbondata.format.DataType = {
+    if (null == dataType) {
+      return null
+    }
+    dataType match {
+      case "string" =>
+        org.apache.carbondata.format.DataType.STRING
+      case "int" =>
+        org.apache.carbondata.format.DataType.INT
+      case "short" =>
+        org.apache.carbondata.format.DataType.SHORT
+      case "long" | "bigint" =>
+        org.apache.carbondata.format.DataType.LONG
+      case "double" =>
+        org.apache.carbondata.format.DataType.DOUBLE
+      case "decimal" =>
+        org.apache.carbondata.format.DataType.DECIMAL
+      case "date" =>
+        org.apache.carbondata.format.DataType.DATE
+      case "timestamp" =>
+        org.apache.carbondata.format.DataType.TIMESTAMP
+      case "array" =>
+        org.apache.carbondata.format.DataType.ARRAY
+      case "struct" =>
+        org.apache.carbondata.format.DataType.STRUCT
+      case _ =>
+        org.apache.carbondata.format.DataType.STRING
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index ef759cf..bcb7ff7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.spark.util
 
-import java.io.FileNotFoundException
+import java.io.{FileNotFoundException, IOException}
 import java.nio.charset.Charset
 import java.util.regex.Pattern
 
@@ -41,15 +41,17 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.Dictionary
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier}
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
 import org.apache.carbondata.core.reader.CarbonDictionaryReader
 import org.apache.carbondata.core.service.CarbonCommonFactory
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter
+import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
 import org.apache.carbondata.processing.csvload.CSVInputFormat
 import org.apache.carbondata.processing.csvload.StringArrayWritable
 import org.apache.carbondata.processing.etl.DataLoadingException
@@ -784,4 +786,105 @@ object GlobalDictionaryUtil {
         throw ex
     }
   }
+
+  def loadDefaultDictionaryValueForNewColumn(carbonTablePath: CarbonTablePath,
+      columnSchema: ColumnSchema,
+      tableIdentifier: CarbonTableIdentifier,
+      storePath: String,
+      defaultValue: String): Unit = {
+
+    var carbonDictionarySortIndexWriter: CarbonDictionarySortIndexWriter = null
+    var dictionary: Dictionary = null
+
+    val dictLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
+        columnSchema.getColumnUniqueId + LockUsage.LOCK)
+
+    val isDictionaryLocked = dictLock.lockWithRetries()
+    try {
+      if (isDictionaryLocked) {
+        LOGGER.info(s"Successfully able to get the dictionary lock for ${
+          columnSchema.getColumnName
+        }")
+      } else {
+        sys.error(s"Dictionary file ${
+          columnSchema.getColumnName
+        } is locked for updation. Please try after some time")
+      }
+      val columnIdentifier = new ColumnIdentifier(columnSchema.getColumnUniqueId,
+        null,
+        columnSchema.getDataType)
+      val writer = CarbonCommonFactory.getDictionaryService
+        .getDictionaryWriter(tableIdentifier, columnIdentifier, storePath)
+
+      val distinctValues: java.util.List[String] = new java.util.ArrayList()
+      writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+      distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+
+      val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(defaultValue, columnSchema)
+      if (null != parsedValue) {
+        writer.write(parsedValue)
+        distinctValues.add(parsedValue)
+      }
+      if (null != writer) {
+        writer.close()
+      }
+
+      LOGGER.info(s"Dictionary file writing is successful for new column ${
+        columnSchema.getColumnName
+      }")
+
+      if (distinctValues.size() > 0) {
+        dictionary = CarbonLoaderUtil.getDictionary(tableIdentifier,
+          new ColumnIdentifier(columnSchema.getColumnUniqueId, null, columnSchema.getDataType),
+          storePath,
+          columnSchema.getDataType
+        )
+        val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator
+        val dictService = CarbonCommonFactory.getDictionaryService
+        val dictionarySortInfo: CarbonDictionarySortInfo =
+          preparator.getDictionarySortInfo(distinctValues, dictionary,
+            columnSchema.getDataType)
+        carbonDictionarySortIndexWriter =
+          dictService.getDictionarySortIndexWriter(tableIdentifier, columnIdentifier,
+            storePath)
+        carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
+        carbonDictionarySortIndexWriter
+          .writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted)
+      }
+
+      if (null != carbonDictionarySortIndexWriter) {
+        carbonDictionarySortIndexWriter.close()
+      }
+
+      LOGGER.info(s"SortIndex file writing is successful for new column ${
+        columnSchema.getColumnName
+      }")
+
+      if (null != writer) {
+        writer.commit()
+      }
+
+      LOGGER.info(s"Dictionary meta file writing is successful for new column ${
+        columnSchema.getColumnName
+      }")
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex)
+        throw ex
+    } finally {
+      CarbonUtil.clearDictionaryCache(dictionary)
+      if (dictLock != null && isDictionaryLocked) {
+        if (dictLock.unlock()) {
+          LOGGER.info(s"Dictionary ${
+            columnSchema.getColumnName
+          } Unlocked Successfully.")
+        } else {
+          LOGGER.error(s"Unable to unlock Dictionary ${
+            columnSchema.getColumnName
+          }")
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 9bfa8a9..56f6e6d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -154,6 +154,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val ARRAY = carbonKeyWord("ARRAY")
   protected val STRUCT = carbonKeyWord("STRUCT")
 
+  protected val CHANGE = carbonKeyWord("CHANGE")
+  protected val TBLPROPERTIES = carbonKeyWord("TBLPROPERTIES")
+
   protected val doubleQuotedString = "\"([^\"]+)\"".r
   protected val singleQuotedString = "'([^']+)'".r
 
@@ -229,14 +232,14 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       , tableName: String, fields: Seq[Field],
       partitionCols: Seq[PartitionerField],
       tableProperties: mutable.Map[String, String],
-      bucketFields: Option[BucketFields]): TableModel = {
+      bucketFields: Option[BucketFields], isAlterFlow: Boolean = false): TableModel = {
 
     fields.zipWithIndex.foreach { x =>
       x._1.schemaOrdinal = x._2
     }
     val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
       fields, tableProperties)
-    if (dims.isEmpty) {
+    if (dims.isEmpty && !isAlterFlow) {
       throw new MalformedCarbonCommandException(s"Table ${
         dbName.getOrElse(
           CarbonCommonConstants.DATABASE_DEFAULT_NAME)
@@ -826,6 +829,19 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       case _ => ("", "")
     }
 
+  protected lazy val valueOptions: Parser[(Int, Int)] =
+    (numericLit <~ ",") ~ numericLit ^^ {
+      case opt ~ optvalue => (opt.toInt, optvalue.toInt)
+      case _ => (0, 0)
+    }
+
+  protected lazy val columnOptions: Parser[(String, String)] =
+    (stringLit <~ ",") ~ stringLit ^^ {
+      case opt ~ optvalue => (opt, optvalue)
+      case _ =>
+        throw new MalformedCarbonCommandException(s"value cannot be empty")
+    }
+
   protected lazy val dimCol: Parser[Field] = anyFieldDef
 
   protected lazy val primitiveTypes =
@@ -1010,4 +1026,39 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       p.getClass.getSimpleName.equals("FloatLit") ||
       p.getClass.getSimpleName.equals("DecimalLit")
     }) ^^ (_.chars)
+
+  /**
+   * This method will parse the given data type and validate against the allowed data types
+   *
+   * @param dataType
+   * @param values
+   * @return
+   */
+  protected def parseDataType(dataType: String, values: Option[List[(Int, Int)]]): DataTypeInfo = {
+    dataType match {
+      case "bigint" =>
+        if (values.isDefined) {
+          throw new MalformedCarbonCommandException("Invalid data type")
+        }
+        DataTypeInfo(dataType)
+      case "decimal" =>
+        var precision: Int = 0
+        var scale: Int = 0
+        if (values.isDefined) {
+          precision = values.get(0)._1
+          scale = values.get(0)._2
+        } else {
+          throw new MalformedCarbonCommandException("Decimal format provided is invalid")
+        }
+        // precision should be > 0 and <= 38 and scale should be >= 0 and <= 38
+        if (precision < 1 || precision > 38) {
+          throw new MalformedCarbonCommandException("Invalid value for precision")
+        } else if (scale < 0 || scale > 38) {
+          throw new MalformedCarbonCommandException("Invalid value for scale")
+        }
+        DataTypeInfo("decimal", precision, scale)
+      case _ =>
+        throw new MalformedCarbonCommandException("Data type provided is invalid.")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index eabcaed..eb43a93 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.SQLContext
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.{BucketingInfo, SchemaEvolution, SchemaEvolutionEntry}
@@ -34,11 +35,13 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo,
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.load.FailureCauses
 import org.apache.carbondata.spark.merger.CompactionType
-import org.apache.carbondata.spark.util.DataTypeConverterUtil
+import org.apache.carbondata.spark.util.{DataTypeConverterUtil, GlobalDictionaryUtil}
 
 case class TableModel(
     ifNotExistsSet: Boolean,
@@ -123,6 +126,164 @@ case class CompactionCallableModel(storePath: String,
     sqlContext: SQLContext,
     compactionType: CompactionType)
 
+case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0)
+
+case class AlterTableDataTypeChangeModel(dataTypeInfo: DataTypeInfo,
+    databaseName: Option[String],
+    tableName: String,
+    columnName: String,
+    newColumnName: String)
+
+case class AlterTableAddColumnsModel(
+    databaseName: Option[String],
+    tableName: String,
+    tableProperties: Map[String, String],
+    dimCols: Seq[Field],
+    msrCols: Seq[Field],
+    highCardinalityDims: Seq[String])
+
+case class AlterTableDropColumnModel(databaseName: Option[String],
+    tableName: String,
+    columns: List[String])
+
+class AlterTableProcessor(
+    alterTableModel: AlterTableAddColumnsModel,
+    dbName: String,
+    tableInfo: TableInfo,
+    carbonTablePath: CarbonTablePath,
+    tableIdentifier: CarbonTableIdentifier,
+    storePath: String) {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def process: Seq[ColumnSchema] = {
+    val tableSchema = tableInfo.getFactTable
+    val tableCols = tableSchema.getListOfColumns.asScala
+    val existingColsSize = tableCols.size
+    var allColumns = tableCols.filter(x => x.isDimensionColumn)
+    var newCols = Seq[ColumnSchema]()
+
+    alterTableModel.dimCols.foreach(field => {
+      val encoders = new java.util.ArrayList[Encoding]()
+      encoders.add(Encoding.DICTIONARY)
+      val columnSchema: ColumnSchema = getColumnSchema(
+        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+        field.name.getOrElse(field.column),
+        isCol = true,
+        encoders,
+        isDimensionCol = true,
+        -1,
+        field.precision,
+        field.scale,
+        field.schemaOrdinal + existingColsSize)
+      allColumns ++= Seq(columnSchema)
+      newCols ++= Seq(columnSchema)
+    })
+
+    allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
+    alterTableModel.msrCols.foreach(field => {
+      val encoders = new java.util.ArrayList[Encoding]()
+      val columnSchema: ColumnSchema = getColumnSchema(
+        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+        field.name.getOrElse(field.column),
+        isCol = true,
+        encoders,
+        isDimensionCol = false,
+        -1,
+        field.precision,
+        field.scale,
+        field.schemaOrdinal + existingColsSize)
+      allColumns ++= Seq(columnSchema)
+      newCols ++= Seq(columnSchema)
+    })
+
+    // Check if there is any duplicate measures or dimensions.
+    // Its based on the dimension name and measure name
+    allColumns.filter(x => !x.isInvisible).groupBy(_.getColumnName)
+      .foreach(f => if (f._2.size > 1) {
+      val name = f._1
+      LOGGER.error(s"Duplicate column found with name: $name")
+      LOGGER.audit(
+        s"Validation failed for Create/Alter Table Operation " +
+        s"for ${ dbName }.${ alterTableModel.tableName }. " +
+        s"Duplicate column found with name: $name")
+      sys.error(s"Duplicate column found with name: $name")
+    })
+
+    val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
+    columnValidator.validateColumns(allColumns)
+
+    // populate table properties map
+    val tablePropertiesMap = tableSchema.getTableProperties
+    alterTableModel.tableProperties.foreach {
+      x => val value = tablePropertiesMap.get(x._1)
+        if (null != value) {
+          tablePropertiesMap.put(x._1, value + "," + x._2)
+        } else {
+          tablePropertiesMap.put(x._1, x._2)
+        }
+    }
+    for (elem <- alterTableModel.tableProperties) {
+      if (elem._1.toLowerCase.startsWith("default.value.")) {
+        val col = newCols.filter(p => p.getColumnName.equalsIgnoreCase(elem._1.substring(14)))
+        if (col.size == 1) {
+          val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col(0).getDataType)
+          if (null != data) {
+            col(0).setDefaultValue(data)
+          } else {
+            LOGGER
+              .error(
+                "Invalid default value for new column " + dbName + "." + alterTableModel.tableName +
+                "." + col(0).getColumnName + " : " + elem._2)
+          }
+          if (col(0).getEncodingList.contains(Encoding.DICTIONARY) &&
+              !col(0).getEncodingList.contains(Encoding.DIRECT_DICTIONARY)) {
+            GlobalDictionaryUtil
+              .loadDefaultDictionaryValueForNewColumn(carbonTablePath,
+                col(0),
+                tableIdentifier,
+                storePath,
+                elem._2)
+          }
+        }
+      }
+    }
+    tableSchema.setListOfColumns(allColumns.asJava)
+    tableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    tableInfo.setFactTable(tableSchema)
+    newCols
+  }
+
+  private def getColumnSchema(dataType: DataType, colName: String, isCol: Boolean,
+      encoders: java.util.List[Encoding], isDimensionCol: Boolean,
+      colGroup: Integer, precision: Integer, scale: Integer, schemaOrdinal: Int): ColumnSchema = {
+    val columnSchema = new ColumnSchema()
+    columnSchema.setDataType(dataType)
+    columnSchema.setColumnName(colName)
+    if (alterTableModel.highCardinalityDims.contains(colName)) {
+      encoders.remove(encoders.remove(Encoding.DICTIONARY))
+    }
+    if (dataType == DataType.TIMESTAMP || dataType == DataType.DATE) {
+      encoders.add(Encoding.DIRECT_DICTIONARY)
+    }
+    val colPropMap = new java.util.HashMap[String, String]()
+    columnSchema.setEncodingList(encoders)
+    val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(
+      alterTableModel.databaseName.getOrElse(dbName),
+      columnSchema)
+    columnSchema.setColumnUniqueId(columnUniqueId)
+    columnSchema.setColumnReferenceId(columnUniqueId)
+    columnSchema.setColumnar(isCol)
+    columnSchema.setDimensionColumn(isDimensionCol)
+    columnSchema.setColumnGroup(colGroup)
+    columnSchema.setPrecision(precision)
+    columnSchema.setScale(scale)
+    columnSchema.setSchemaOrdinal(schemaOrdinal)
+    columnSchema.setUseInvertedIndex(isDimensionCol)
+    columnSchema
+  }
+}
 object TableNewProcessor {
   def apply(cm: TableModel): TableInfo = {
     new TableNewProcessor(cm).process
@@ -170,11 +331,6 @@ class TableNewProcessor(cm: TableModel) {
     if (dataType == DataType.TIMESTAMP || dataType == DataType.DATE) {
       encoders.add(Encoding.DIRECT_DICTIONARY)
     }
-    val colPropMap = new java.util.HashMap[String, String]()
-    if (cm.colProps.isDefined && null != cm.colProps.get.get(colName)) {
-      val colProps = cm.colProps.get.get(colName)
-      colProps.asScala.foreach { x => colPropMap.put(x.key, x.value) }
-    }
     columnSchema.setEncodingList(encoders)
     val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
     val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 32279ed..f92b1e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -122,7 +122,7 @@ case class CarbonDictionaryDecoder(
 
   val getDictionaryColumnIds = {
     val attributes = child.output
-    val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
+    val dictIds: Array[(String, ColumnIdentifier, CarbonDimension)] = attributes.map { a =>
       val attr = aliasMap.getOrElse(a, a)
       val relation = relations.find(p => p.contains(attr))
       if (relation.isDefined && canBeDecoded(attr)) {
@@ -134,7 +134,7 @@ case class CarbonDictionaryDecoder(
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
             !carbonDimension.isComplex()) {
           (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
-            carbonDimension.getDataType)
+            carbonDimension)
         } else {
           (null, null, null)
         }
@@ -214,7 +214,7 @@ case class CarbonDictionaryDecoder(
         try {
           cache.get(new DictionaryColumnUniqueIdentifier(
             atiMap(f._1).getCarbonTableIdentifier,
-            f._2, f._3))
+            f._2, f._3.getDataType))
         } catch {
           case _: Throwable => null
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index e28ee8f..fce9b4c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -37,11 +37,30 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
  * Carbon relation provider compliant to data source api.
  * Creates carbon relations
  */
-class CarbonSource extends CreatableRelationProvider
+class CarbonSource extends CreatableRelationProvider with RelationProvider
   with SchemaRelationProvider with DataSourceRegister {
 
   override def shortName(): String = "carbondata"
 
+  // will be called if hive supported create table command is provided
+  override def createRelation(sqlContext: SQLContext,
+      parameters: Map[String, String]): BaseRelation = {
+    CarbonEnv.init(sqlContext.sparkSession)
+    // if path is provided we can directly create Hadoop relation. \
+    // Otherwise create datasource relation
+    parameters.get("tablePath") match {
+      case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
+        Array(path),
+        parameters,
+        None)
+      case _ =>
+        val options = new CarbonOption(parameters)
+        val storePath = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.STORE_LOCATION)
+        val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
+        CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
+    }
+  }
   // called by any write operation like INSERT INTO DDL or DataFrame.write API
   override def createRelation(
       sqlContext: SQLContext,
@@ -108,8 +127,9 @@ class CarbonSource extends CreatableRelationProvider
   private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
       dataSchema: StructType): String = {
 
-    val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-    val tableName: String = parameters.getOrElse("tableName", "default_table")
+    val dbName: String = parameters.getOrElse("dbName",
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+    val tableName: String = parameters.getOrElse("tableName", "default_table").toLowerCase
     if (StringUtils.isBlank(tableName)) {
       throw new MalformedCarbonCommandException("The Specified Table Name is Blank")
     }
@@ -125,7 +145,8 @@ class CarbonSource extends CreatableRelationProvider
         val fields = dataSchema.map { col =>
           val dataType = Option(col.dataType.toString)
           // This is to parse complex data types
-          val f: Field = Field(col.name, dataType, Option(col.name), None, null)
+          val colName = col.name.toLowerCase
+          val f: Field = Field(colName, dataType, Option(colName), None, null)
           // the data type of the decimal type will be like decimal(10,0)
           // so checking the start of the string and taking the precision and scale.
           // resetting the data type with decimal
@@ -139,7 +160,7 @@ class CarbonSource extends CreatableRelationProvider
           f
         }
         val map = scala.collection.mutable.Map[String, String]()
-        parameters.foreach { parameter => map.put(parameter._1, parameter._2) }
+        parameters.foreach { parameter => map.put(parameter._1, parameter._2.toLowerCase) }
         val bucketFields = if (options.isBucketingEnabled) {
             if (options.bucketNumber.toString.contains("-") ||
                 options.bucketNumber.toString.contains("+") ) {
@@ -147,7 +168,8 @@ class CarbonSource extends CreatableRelationProvider
                                                         options.bucketNumber.toString)
             }
             else {
-              Some(BucketFields(options.bucketColumns.split(","), options.bucketNumber))
+              Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
+                options.bucketNumber))
             }
           } else {
             None

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 2879130..51808bd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -71,6 +71,33 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
+      case dataTypeChange@AlterTableDataTypeChange(alterTableChangeDataTypeModel) =>
+        val isCarbonTable = CarbonEnv.get.carbonMetastore
+          .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
+            alterTableChangeDataTypeModel.databaseName))(sparkSession)
+        if (isCarbonTable) {
+          ExecutedCommandExec(dataTypeChange) :: Nil
+        } else {
+          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+        }
+      case addColumn@AlterTableAddColumns(alterTableAddColumnsModel) =>
+        val isCarbonTable = CarbonEnv.get.carbonMetastore
+          .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
+            alterTableAddColumnsModel.databaseName))(sparkSession)
+        if (isCarbonTable) {
+          ExecutedCommandExec(addColumn) :: Nil
+        } else {
+          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+        }
+      case dropColumn@AlterTableDropColumns(alterTableDropColumnModel) =>
+        val isCarbonTable = CarbonEnv.get.carbonMetastore
+          .tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
+            alterTableDropColumnModel.databaseName))(sparkSession)
+        if (isCarbonTable) {
+          ExecutedCommandExec(dropColumn) :: Nil
+        } else {
+          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+        }
       case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
         if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) && isFormatted =>
         val resolvedTable =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d1f1771..28f4df8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -24,6 +24,8 @@ import java.util.concurrent.ExecutorService
 import java.util.concurrent.Future
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ListBuffer
 import scala.language.implicitConversions
 
 import org.apache.commons.lang3.StringUtils
@@ -33,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation, HiveExternalCatalog}
 import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
@@ -44,18 +46,20 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry}
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, DataTypeConverterUtil, GlobalDictionaryUtil}
 
 object Checker {
   def validateTableExists(
@@ -136,6 +140,298 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
   }
 }
 
+private[sql] case class AlterTableDataTypeChange(
+    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    val tableName = alterTableDataTypeChangeModel.tableName
+    val dbName = alterTableDataTypeChangeModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
+    val relation =
+      CarbonEnv.get.carbonMetastore
+        .lookupRelation(Option(dbName), tableName)(sparkSession)
+        .asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      LOGGER.audit(s"Alter table change data type request has failed. " +
+                   s"Table $dbName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    // acquire the lock first
+    val table = relation.tableMeta.carbonTable
+    val carbonLock = CarbonLockFactory
+      .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    try {
+      // get the latest carbon table and check for column existence
+      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+      val columnName = alterTableDataTypeChangeModel.columnName
+      var carbonColumnToBeModified: CarbonColumn = null
+      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val tableMetadataFile = carbonTablePath.getSchemaFilePath
+      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+        .readSchemaFile(tableMetadataFile)
+      // maintain the added column for schema evolution history
+      var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
+      var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null
+      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+      columnSchemaList.foreach { columnSchema =>
+        if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
+          deletedColumnSchema = CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
+          columnSchema.setData_type(DataTypeConverterUtil
+            .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
+          columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
+          columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
+          addColumnSchema = columnSchema
+        }
+      }
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+      schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
+      schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
+      tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+        .setTime_stamp(System.currentTimeMillis)
+      CarbonEnv.get.carbonMetastore
+        .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+          tableInfo,
+          schemaEvolutionEntry,
+          carbonTable.getStorePath)(sparkSession)
+
+      val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+      val schema = CarbonEnv.get.carbonMetastore
+        .lookupRelation(tableIdentifier)(sparkSession).schema.json
+      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
+        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
+      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+      LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception =>
+        LOGGER.error("Alter table change datatype failed : " + e.getMessage)
+        throw e
+    } finally {
+      // release lock after command execution completion
+      if (carbonLock != null) {
+        if (carbonLock.unlock()) {
+          LOGGER.info("Alter table change data type lock released successfully")
+        } else {
+          LOGGER.error("Unable to release lock during alter table change data type operation")
+        }
+      }
+    }
+    Seq.empty
+  }
+}
+
+private[sql] case class AlterTableAddColumns(
+    alterTableAddColumnsModel: AlterTableAddColumnsModel) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    val tableName = alterTableAddColumnsModel.tableName
+    val dbName = alterTableAddColumnsModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
+    val relation =
+      CarbonEnv.get.carbonMetastore
+        .lookupRelation(Option(dbName), tableName)(sparkSession)
+        .asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      LOGGER.audit(s"Alter table add columns request has failed. " +
+                   s"Table $dbName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    // acquire the lock first
+    val table = relation.tableMeta.carbonTable
+    val carbonLock = CarbonLockFactory
+      .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    try {
+      // get the latest carbon table and check for column existence
+      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val tableMetadataFile = carbonTablePath.getSchemaFilePath
+      val thriftTableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+        .readSchemaFile(tableMetadataFile)
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+      val wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(thriftTableInfo,
+          dbName,
+          tableName,
+          carbonTable.getStorePath)
+      val newCols = new AlterTableProcessor(alterTableAddColumnsModel,
+        dbName,
+        wrapperTableInfo,
+        carbonTablePath,
+        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getStorePath).process
+      val schemaEvolutionEntry = new org.apache.carbondata.core.metadata
+      .schema.SchemaEvolutionEntry()
+      schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
+      schemaEvolutionEntry.setAdded(newCols.toList.asJava)
+
+      val thriftTable = schemaConverter
+        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+      thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+        .setTime_stamp(System.currentTimeMillis)
+      CarbonEnv.get.carbonMetastore
+        .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+          thriftTable,
+          schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
+          carbonTable.getStorePath)(sparkSession)
+
+      val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+      val schema = CarbonEnv.get.carbonMetastore
+        .lookupRelation(tableIdentifier)(sparkSession).schema.json
+      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
+        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
+      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+      LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception =>
+        LOGGER.error("Alter table add columns failed : " + e.getMessage)
+        throw e
+    } finally {
+      // release lock after command execution completion
+      if (carbonLock != null) {
+        if (carbonLock.unlock()) {
+          LOGGER.info("Alter table add columns lock released successfully")
+        } else {
+          LOGGER.error("Unable to release lock during alter table add columns operation")
+        }
+      }
+    }
+    Seq.empty
+  }
+}
+
+private[sql] case class AlterTableDropColumns(
+    alterTableDropColumnModel: AlterTableDropColumnModel) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    val tableName = alterTableDropColumnModel.tableName
+    val dbName = alterTableDropColumnModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
+    val relation =
+      CarbonEnv.get.carbonMetastore
+        .lookupRelation(Option(dbName), tableName)(sparkSession)
+        .asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      LOGGER.audit(s"Alter table drop columns request has failed. " +
+                   s"Table $dbName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    // acquire the lock first
+    val table = relation.tableMeta.carbonTable
+    val carbonLock = CarbonLockFactory
+      .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    try {
+      // get the latest carbon table and check for column existence
+      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+      // check each column existence in the table
+      val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+      var dictionaryColumns = ListBuffer[CarbonColumn]()
+      var keyColumnCountToBeDeleted = 0
+      // TODO: if deleted column list includes shared dictionary/bucketted column throw an error
+      alterTableDropColumnModel.columns.foreach { column =>
+        var columnExist = false
+        tableColumns.foreach { tableColumn =>
+          // column should not be already deleted and should exist in the table
+          if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
+            if (tableColumn.isDimesion) {
+              keyColumnCountToBeDeleted += 1
+              if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
+                dictionaryColumns += tableColumn
+              }
+            }
+            columnExist = true
+          }
+        }
+        if (!columnExist) {
+          sys.error(s"Column $column does not exists in the table $dbName.$tableName")
+        }
+      }
+      // take the total key column count. key column to be deleted should not
+      // be >= key columns in schema
+      var totalKeyColumnInSchema = 0
+      tableColumns.foreach { tableColumn =>
+        // column should not be already deleted and should exist in the table
+        if (!tableColumn.isInvisible && tableColumn.isDimesion) {
+          totalKeyColumnInSchema += 1
+        }
+      }
+      if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
+        sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
+      }
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val tableMetadataFile = carbonTablePath.getSchemaFilePath
+      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+        .readSchemaFile(tableMetadataFile)
+      // maintain the deleted columns for schema evolution history
+      var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
+      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+      alterTableDropColumnModel.columns.foreach { column =>
+        columnSchemaList.foreach { columnSchema =>
+          if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
+            deletedColumnSchema += CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
+            columnSchema.invisible = true
+          }
+        }
+      }
+      // add deleted columns to schema evolution history and update the schema
+      tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+        .setTime_stamp(System.currentTimeMillis)
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+      schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
+      CarbonEnv.get.carbonMetastore
+        .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+          tableInfo,
+          schemaEvolutionEntry,
+          carbonTable.getStorePath)(sparkSession)
+
+      val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+      val schema = CarbonEnv.get.carbonMetastore
+        .lookupRelation(tableIdentifier)(sparkSession).schema.json
+      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
+        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
+      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+      // TODO: 1. add check for deletion of index tables
+      // delete dictionary files for dictionary column and clear dictionary cache from memory
+      CarbonUtil.deleteDictionaryFileAndCache(dictionaryColumns.toList.asJava, carbonTable)
+      LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception =>
+        LOGGER.error("Alter table drop columns failed : " + e.getMessage)
+        throw e
+    } finally {
+      // release lock after command execution completion
+      if (carbonLock != null) {
+        if (carbonLock.unlock()) {
+          LOGGER.info("Alter table drop columns lock released successfully")
+        } else {
+          LOGGER.error("Unable to release lock during alter table drop columns operation")
+        }
+      }
+    }
+    Seq.empty
+  }
+}
+
 case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
@@ -783,13 +1079,15 @@ private[sql] case class DescribeCommandFormatted(
       .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
     val mapper = new ObjectMapper()
     val colProps = StringBuilder.newBuilder
+    val dims = relation.metaData.dims.map(x => x.toLowerCase)
     var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
-      val comment = if (relation.metaData.dims.contains(field.name)) {
+      val fieldName = field.name.toLowerCase
+      val comment = if (dims.contains(fieldName)) {
         val dimension = relation.metaData.carbonTable.getDimensionByName(
           relation.tableMeta.carbonTableIdentifier.getTableName,
-          field.name)
+          fieldName)
         if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
-          colProps.append(field.name).append(".")
+          colProps.append(fieldName).append(".")
             .append(mapper.writeValueAsString(dimension.getColumnProperties))
             .append(",")
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 34bd803..2e047e0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
+import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.locks.ZookeeperInit
 import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -251,18 +252,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
                 if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
                   val tableName = tableFolder.getName
                   val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-
-
-                  val createTBase = new ThriftReader.TBaseCreator() {
-                    override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
-                      new TableInfo()
-                    }
-                  }
-                  val thriftReader = new ThriftReader(tableMetadataFile, createTBase)
-                  thriftReader.open()
-                  val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
-                  thriftReader.close()
-
+                  val tableInfo: TableInfo = readSchemaFile(tableMetadataFile)
                   val schemaConverter = new ThriftWrapperSchemaConverterImpl
                   val wrapperTableInfo = schemaConverter
                     .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
@@ -292,9 +282,56 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
   }
 
   /**
+   * This method will read the schema file from a given path
+   *
+   * @param schemaFilePath
+   * @return
+   */
+  def readSchemaFile(schemaFilePath: String): TableInfo = {
+    val createTBase = new ThriftReader.TBaseCreator() {
+      override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
+        new TableInfo()
+      }
+    }
+    val thriftReader = new ThriftReader(schemaFilePath, createTBase)
+    thriftReader.open()
+    val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
+    thriftReader.close()
+    tableInfo
+  }
+
+  /**
+   * This method will overwrite the existing schema and update it with the gievn details
+   *
+   * @param carbonTableIdentifier
+   * @param thriftTableInfo
+   * @param schemaEvolutionEntry
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def updateTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      schemaEvolutionEntry: SchemaEvolutionEntry,
+      carbonStorePath: String)
+    (sparkSession: SparkSession): Unit = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(thriftTableInfo,
+        carbonTableIdentifier.getDatabaseName,
+        carbonTableIdentifier.getTableName,
+        carbonStorePath)
+    thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
+    createSchemaThriftFile(wrapperTableInfo,
+      thriftTableInfo,
+      carbonTableIdentifier.getDatabaseName,
+      carbonTableIdentifier.getTableName)(sparkSession)
+    // add a logger after completion saying update schema is success for given db and table name
+  }
+
+  /**
    *
    * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
-   * Load CarbonTable from wrapper tableinfo
+   * Load CarbonTable from wrapper tableInfo
    *
    */
   def createTableFromThrift(
@@ -304,13 +341,36 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
     if (tableExists(tableName, Some(dbName))(sparkSession)) {
       sys.error(s"Table [$tableName] already exists under Database [$dbName]")
     }
+    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val thriftTableInfo = schemaConverter
       .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
     thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
       .add(schemaEvolutionEntry)
+    val carbonTablePath = createSchemaThriftFile(tableInfo,
+      thriftTableInfo,
+      dbName,
+      tableName)(sparkSession)
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
+    carbonTablePath
+  }
 
+  /**
+   * This method will write the schema thrift file in carbon store and load table metadata
+   *
+   * @param tableInfo
+   * @param thriftTableInfo
+   * @param dbName
+   * @param tableName
+   * @param sparkSession
+   * @return
+   */
+  private def createSchemaThriftFile(
+      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      dbName: String, tableName: String)
+    (sparkSession: SparkSession): String = {
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
       tableInfo.getFactTable.getTableId)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
@@ -318,24 +378,39 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
     tableInfo.setStorePath(storePath)
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
-      CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
-
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
     }
     val thriftWriter = new ThriftWriter(schemaFilePath, false)
-    thriftWriter.open()
+    thriftWriter.open(FileWriteOperation.OVERWRITE)
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
+    removeTableFromMetadata(dbName, tableName)
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+        CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
     metadata.tablesMeta += tableMeta
-    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
     carbonTablePath.getPath
   }
 
+  /**
+   * This method will remove the table meta from catalog metadata array
+   *
+   * @param dbName
+   * @param tableName
+   */
+  private def removeTableFromMetadata(dbName: String, tableName: String) = {
+    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+    metadataToBeRemoved match {
+      case Some(tableMeta) =>
+        metadata.tablesMeta -= tableMeta
+        CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+      case None =>
+        LOGGER.debug(s"No entry for table $tableName in database $dbName")
+    }
+  }
+
   private def updateMetadataByWrapperTable(
       wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = {
 
@@ -612,7 +687,7 @@ object CarbonMetastoreTypes extends RegexParsers {
     "binary" ^^^ BinaryType |
     "boolean" ^^^ BooleanType |
     fixedDecimalType |
-    "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
+    "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) |
     "varchar\\((\\d+)\\)".r ^^^ StringType |
     "date" ^^^ DateType |
     "timestamp" ^^^ TimestampType
@@ -798,8 +873,6 @@ case class CarbonRelation(
         val output = CarbonMetastoreTypes.toDataType {
           column.getDataType.toString
             .toLowerCase match {
-            case "int" => "long"
-            case "short" => "long"
             case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
               .getColumnSchema.getScale + ")"
             case others => others

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 6ad404a..4960783 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -24,6 +24,9 @@ import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
 
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
 /**
  * TODO remove the duplicate code and add the common methods to common class.
  * Parser for All Carbon DDL, DML cases in Unified context
@@ -51,11 +54,13 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
 
   protected lazy val startCommand: Parser[LogicalPlan] =
-    loadManagement| showLoads | alterTable
+    loadManagement| showLoads | alterTable | restructure
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
 
+  protected lazy val restructure: Parser[LogicalPlan] =
+    alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns
 
   protected lazy val alterTable: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";")  ^^ {
@@ -129,4 +134,80 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
       case databaseName ~ tableName ~ limit =>
         ShowLoadsCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit)
     }
+
+  protected lazy val alterTableModifyDataType: Parser[LogicalPlan] =
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~
+    ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ {
+      case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values =>
+        // both the column names should be same
+        CommonUtil.validateColumnNames(columnName, columnNameCopy)
+        val alterTableChangeDataTypeModel =
+          AlterTableDataTypeChangeModel(parseDataType(dataType.toLowerCase, values),
+            convertDbNameToLowerCase(dbName),
+            table.toLowerCase,
+            columnName.toLowerCase,
+            columnNameCopy.toLowerCase)
+        AlterTableDataTypeChange(alterTableChangeDataTypeModel)
+    }
+
+  protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~
+    (ADD ~> COLUMNS ~> "(" ~> repsep(anyFieldDef, ",") <~ ")") ~
+    (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+      case dbName ~ table ~ fields ~ tblProp =>
+        fields.foreach{ f =>
+          if (isComplexDimDictionaryExclude(f.dataType.get)) {
+            throw new MalformedCarbonCommandException(
+              s"Add column is unsupported for complex datatype column: ${f.column}")
+          }
+        }
+        val tableProps = if (tblProp.isDefined) {
+          // default value should not be converted to lower case
+          val tblProps = tblProp.get.map(f => if (f._1.toLowerCase.startsWith("default.value.")) {
+            f._1 -> f._2
+          } else {
+            f._1 -> f._2.toLowerCase
+          })
+          scala.collection.mutable.Map(tblProps: _*)
+        } else {
+          scala.collection.mutable.Map.empty[String, String]
+        }
+
+        val tableModel = prepareTableModel (false,
+          convertDbNameToLowerCase(dbName),
+          table.toLowerCase,
+          fields.map(convertFieldNamesToLowercase),
+          Seq.empty,
+          tableProps,
+          None,
+          true)
+
+        val alterTableAddColumnsModel = AlterTableAddColumnsModel(convertDbNameToLowerCase(dbName),
+          table,
+          tableProps,
+          tableModel.dimCols,
+          tableModel.msrCols,
+          tableModel.highcardinalitydims.getOrElse(Seq.empty))
+        AlterTableAddColumns(alterTableAddColumnsModel)
+    }
+
+  private def convertFieldNamesToLowercase(field: Field): Field = {
+    val name = field.column.toLowerCase
+    field.copy(column = name, name = Some(name))
+  }
+  protected lazy val alterTableDropColumn: Parser[LogicalPlan] =
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ DROP ~ COLUMNS ~
+    ("(" ~> rep1sep(ident, ",") <~ ")") <~ opt(";") ^^ {
+      case dbName ~ table ~ drop ~ columns ~ values =>
+        // validate that same column name is not repeated
+        values.groupBy(identity).collect {
+          case (x, ys) if ys.lengthCompare(1) > 0 =>
+            throw new MalformedCarbonCommandException(s"$x is duplicate. Duplicate columns not " +
+                                                      s"allowed")
+        }
+        val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName),
+          table.toLowerCase,
+          values)
+        AlterTableDropColumns(alterTableDropColumnModel)
+    }
 }