You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/03/16 09:28:56 UTC

[13/14] incubator-carbondata git commit: Changes done to support split of schema string and store it in the table properties Added more testcases for alter table validations.

Changes done to support split of schema string and store it in the table properties
Added more testcases for alter table validations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/35739e5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/35739e5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/35739e5e

Branch: refs/heads/master
Commit: 35739e5eb24f3c4ff94a04d970ed889d9810c6f0
Parents: fc1af96
Author: nareshpr <pr...@gmail.com>
Authored: Mon Mar 13 16:50:52 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Mar 16 14:50:43 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   5 +
 .../schema/table/column/CarbonMeasure.java      |  16 +-
 .../RestructureBasedRawResultCollector.java     |   2 +-
 .../RestructureBasedVectorResultCollector.java  |   6 +-
 .../scan/executor/util/RestructureUtil.java     |  13 +
 .../vector/MeasureDataVectorProcessor.java      |   4 +
 .../src/test/resources/restructure/data1.csv    |   1 +
 .../src/test/resources/restructure/data2.csv    |   1 +
 .../src/test/resources/restructure/data3.csv    |   1 +
 .../src/test/resources/restructure/data4.csv    |   1 +
 .../src/test/resources/restructure/data5.csv    |   1 +
 .../carbondata/spark/util/CarbonScalaUtil.scala |  17 ++
 .../execution/command/carbonTableSchema.scala   |  31 ++-
 .../sql/parser/CarbonSpark2SqlParser.scala      |  17 +-
 .../AlterTableValidationTestCase.scala          | 238 +++++++++++++++----
 .../rowreader/AddColumnTestCases.scala          | 121 ++++++++++
 .../rowreader/ChangeDataTypeTestCases.scala     |  95 ++++++++
 .../rowreader/DropColumnTestCases.scala         |  70 ++++++
 .../vectorreader/AddColumnTestCases.scala       | 114 +++++++++
 .../vectorreader/ChangeDataTypeTestCases.scala  |  96 ++++++++
 .../vectorreader/DropColumnTestCases.scala      |  71 ++++++
 21 files changed, 848 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index fc16d06..ff257dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1159,6 +1159,11 @@ public final class CarbonCommonConstants {
 
   public static final int DICTIONARY_DEFAULT_CARDINALITY = 1;
 
+  public static final String SPARK_SCHEMA_STRING_LENGTH_THRESHOLD =
+      "spark.sql.sources.schemaStringLengthThreshold";
+
+  public static final int SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT = 4000;
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonMeasure.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonMeasure.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonMeasure.java
index 9669d82..d13dc6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonMeasure.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonMeasure.java
@@ -27,38 +27,26 @@ public class CarbonMeasure extends CarbonColumn {
    */
   private static final long serialVersionUID = 354341488059013977L;
 
-  /**
-   * Used when this column contains decimal data.
-   */
-  private int scale;
-
-  /**
-   * precision in decimal data
-   */
-  private int precision;
-
   public CarbonMeasure(ColumnSchema columnSchema, int ordinal) {
     this(columnSchema, ordinal, 0);
   }
 
   public CarbonMeasure(ColumnSchema columnSchema, int ordinal, int schemaOrdinal) {
     super(columnSchema, ordinal, schemaOrdinal);
-    this.scale = columnSchema.getScale();
-    this.precision = columnSchema.getPrecision();
   }
 
   /**
    * @return the scale
    */
   public int getScale() {
-    return scale;
+    return columnSchema.getScale();
   }
 
   /**
    * @return the precision
    */
   public int getPrecision() {
-    return precision;
+    return columnSchema.getPrecision();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index 17271c2..14867d6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -244,7 +244,7 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
           byte[] newColumnDefaultValue = null;
           Object defaultValue = dimensionInfo.getDefaultValues()[i];
           if (null != defaultValue) {
-            newColumnDefaultValue = UTF8String.fromString((String) defaultValue).getBytes();
+            newColumnDefaultValue = ((UTF8String)defaultValue).getBytes();
           } else {
             newColumnDefaultValue =
                 UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL).getBytes();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
index 25653da..b1ce040 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
@@ -104,6 +104,9 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
     int measureExistIndex = 0;
     for (int i = 0; i < queryMeasures.length; i++) {
       if (!measureInfo.getMeasureExists()[i]) {
+        // add a dummy column vector result collector object
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo;
         continue;
       }
       QueryMeasure currentBlockMeasure =
@@ -174,7 +177,7 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
    * This method will fill the default values of non existing dimensions in the current block
    */
   private void fillDataForNonExistingDimensions() {
-    for (int i = 0; i < tableBlockExecutionInfos.getActualQueryMeasures().length; i++) {
+    for (int i = 0; i < tableBlockExecutionInfos.getActualQueryDimensions().length; i++) {
       if (!dimensionInfo.getDimensionExists()[i]) {
         CarbonDimension dimension =
             tableBlockExecutionInfos.getActualQueryDimensions()[i].getDimension();
@@ -200,6 +203,7 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
    *
    * @param vector
    * @param columnVectorInfo
+   *
    * @param defaultValue
    */
   private void fillDictionaryData(CarbonColumnVector vector, ColumnVectorInfo columnVectorInfo,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 59fd215..ac6d60e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -75,6 +75,14 @@ public class RestructureUtil {
         for (CarbonDimension tableDimension : tableBlockDimensions) {
           if (tableDimension.getColumnId().equals(queryDimension.getDimension().getColumnId())) {
             QueryDimension currentBlockDimension = new QueryDimension(tableDimension.getColName());
+            tableDimension.getColumnSchema()
+                .setDataType(queryDimension.getDimension().getDataType());
+            tableDimension.getColumnSchema()
+                .setPrecision(queryDimension.getDimension().getColumnSchema().getPrecision());
+            tableDimension.getColumnSchema()
+                .setScale(queryDimension.getDimension().getColumnSchema().getScale());
+            tableDimension.getColumnSchema()
+                .setDefaultValue(queryDimension.getDimension().getDefaultValue());
             currentBlockDimension.setDimension(tableDimension);
             currentBlockDimension.setQueryOrder(queryDimension.getQueryOrder());
             presentDimension.add(currentBlockDimension);
@@ -309,6 +317,11 @@ public class RestructureUtil {
       for (CarbonMeasure carbonMeasure : currentBlockMeasures) {
         if (carbonMeasure.getColumnId().equals(queryMeasure.getMeasure().getColumnId())) {
           QueryMeasure currentBlockMeasure = new QueryMeasure(carbonMeasure.getColName());
+          carbonMeasure.getColumnSchema().setDataType(queryMeasure.getMeasure().getDataType());
+          carbonMeasure.getColumnSchema().setPrecision(queryMeasure.getMeasure().getPrecision());
+          carbonMeasure.getColumnSchema().setScale(queryMeasure.getMeasure().getScale());
+          carbonMeasure.getColumnSchema()
+              .setDefaultValue(queryMeasure.getMeasure().getDefaultValue());
           currentBlockMeasure.setMeasure(carbonMeasure);
           currentBlockMeasure.setQueryOrder(queryMeasure.getQueryOrder());
           presentMeasure.add(currentBlockMeasure);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
index ecf82d6..6f238c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
@@ -166,6 +166,7 @@ public class MeasureDataVectorProcessor {
       int vectorOffset = info.vectorOffset;
       CarbonColumnVector vector = info.vector;
       int precision = info.measure.getMeasure().getPrecision();
+      int newMeasureScale = info.measure.getMeasure().getScale();
       BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
       for (int i = offset; i < len; i++) {
         if (nullBitSet.get(i)) {
@@ -173,6 +174,9 @@ public class MeasureDataVectorProcessor {
         } else {
           BigDecimal decimal =
               dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(i);
+          if (decimal.scale() < newMeasureScale) {
+            decimal = decimal.setScale(newMeasureScale);
+          }
           Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(decimal);
           vector.putDecimal(vectorOffset, toDecimal, precision);
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark-common-test/src/test/resources/restructure/data1.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/restructure/data1.csv b/integration/spark-common-test/src/test/resources/restructure/data1.csv
new file mode 100644
index 0000000..f5ee3dd
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/restructure/data1.csv
@@ -0,0 +1 @@
+100,spark,abc,2015-04-23 12:01:01,21.23

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark-common-test/src/test/resources/restructure/data2.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/restructure/data2.csv b/integration/spark-common-test/src/test/resources/restructure/data2.csv
new file mode 100644
index 0000000..a33ad96
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/restructure/data2.csv
@@ -0,0 +1 @@
+101,spark1,2016-04-23 12:01:01,312.23,def

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark-common-test/src/test/resources/restructure/data3.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/restructure/data3.csv b/integration/spark-common-test/src/test/resources/restructure/data3.csv
new file mode 100644
index 0000000..5498f9e
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/restructure/data3.csv
@@ -0,0 +1 @@
+102,spark2,2017-04-23 12:01:01,22.2722,mkg

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark-common-test/src/test/resources/restructure/data4.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/restructure/data4.csv b/integration/spark-common-test/src/test/resources/restructure/data4.csv
new file mode 100644
index 0000000..49a678a
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/restructure/data4.csv
@@ -0,0 +1 @@
+104,spark4,2018-04-23 12:01:01,411.23

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark-common-test/src/test/resources/restructure/data5.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/restructure/data5.csv b/integration/spark-common-test/src/test/resources/restructure/data5.csv
new file mode 100644
index 0000000..2a1905d
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/restructure/data5.csv
@@ -0,0 +1 @@
+106,spark6,pqr,27.13,2004-04-23 12:01:01

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 8580691..39b27ca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -20,8 +20,11 @@ package org.apache.carbondata.spark.util
 import java.io.File
 import java.text.SimpleDateFormat
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.command.DataTypeInfo
+import org.apache.spark.sql.hive.HiveExternalCatalog._
+import org.apache.spark.sql.internal.StaticSQLConf._
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -294,4 +297,18 @@ object CarbonScalaUtil {
     columnSchema.schemaOrdinal = thriftColumnSchema.schemaOrdinal
     columnSchema
   }
+
+  def prepareSchemaJsonForAlterTable(sparkConf: SparkConf, schemaJsonString: String): String = {
+    val threshold = sparkConf
+      .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
+        CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
+    // Split the JSON string.
+    val parts = schemaJsonString.grouped(threshold).toSeq
+    var schemaParts: Seq[String] = Seq.empty
+    schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
+    parts.zipWithIndex.foreach { case (part, index) =>
+      schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
+    }
+    schemaParts.mkString(",")
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 7f27c75..1b8b34d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -165,7 +165,22 @@ private[sql] case class AlterTableDataTypeChange(
       val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
       val columnName = alterTableDataTypeChangeModel.columnName
       var carbonColumnToBeModified: CarbonColumn = null
-      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
+
+      if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
+        LOGGER.audit(s"Alter table change data type request has failed. " +
+                     s"Column $columnName does not exist")
+        sys.error(s"Column does not exist: $columnName")
+      }
+      val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
+      if (carbonColumn.size == 1) {
+        CarbonScalaUtil
+          .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn(0))
+      } else {
+        LOGGER.audit(s"Alter table change data type request has failed. " +
+                     s"Column $columnName is invalid")
+        sys.error(s"Invalid Column: $columnName")
+      }
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
@@ -175,7 +190,7 @@ private[sql] case class AlterTableDataTypeChange(
       // maintain the added column for schema evolution history
       var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
       var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null
-      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+      val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
       columnSchemaList.foreach { columnSchema =>
         if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
           deletedColumnSchema = CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
@@ -200,8 +215,10 @@ private[sql] case class AlterTableDataTypeChange(
       val tableIdentifier = TableIdentifier(tableName, Some(dbName))
       val schema = CarbonEnv.get.carbonMetastore
         .lookupRelation(tableIdentifier)(sparkSession).schema.json
+      val schemaParts = CarbonScalaUtil
+        .prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
       sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
-        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
+        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
@@ -284,8 +301,10 @@ private[sql] case class AlterTableAddColumns(
       val tableIdentifier = TableIdentifier(tableName, Some(dbName))
       val schema = CarbonEnv.get.carbonMetastore
         .lookupRelation(tableIdentifier)(sparkSession).schema.json
+      val schemaParts = CarbonScalaUtil
+        .prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
       sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
-        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
+        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
@@ -494,8 +513,10 @@ private[sql] case class AlterTableDropColumns(
       val tableIdentifier = TableIdentifier(tableName, Some(dbName))
       val schema = CarbonEnv.get.carbonMetastore
         .lookupRelation(tableIdentifier)(sparkSession).schema.json
+      val schemaParts = CarbonScalaUtil
+        .prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
       sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
-        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
+        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       // TODO: 1. add check for deletion of index tables
       // delete dictionary files for dictionary column and clear dictionary cache from memory

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 4960783..59b9f63 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -163,11 +164,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         }
         val tableProps = if (tblProp.isDefined) {
           // default value should not be converted to lower case
-          val tblProps = tblProp.get.map(f => if (f._1.toLowerCase.startsWith("default.value.")) {
-            f._1 -> f._2
-          } else {
-            f._1 -> f._2.toLowerCase
-          })
+          val tblProps = tblProp.get
+            .map(f => if (CarbonCommonConstants.TABLE_BLOCKSIZE.equalsIgnoreCase(f._1) ||
+                          CarbonCommonConstants.NO_INVERTED_INDEX.equalsIgnoreCase(f._1) ||
+                          CarbonCommonConstants.COLUMN_GROUPS.equalsIgnoreCase(f._1)) {
+              throw new MalformedCarbonCommandException(
+                s"Unsupported Table property in add column: ${ f._1 }")
+            } else if (f._1.toLowerCase.startsWith("default.value.")) {
+              f._1 -> f._2
+            } else {
+              f._1 -> f._2.toLowerCase
+            })
           scala.collection.mutable.Map(tblProps: _*)
         } else {
           scala.collection.mutable.Map.empty[String, String]

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index 82de5bc..1bbbfcc 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -1,11 +1,12 @@
 package org.apache.spark.carbondata.restructure
 
+import java.math.{BigDecimal, RoundingMode}
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.types.Decimal
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 
 class AlterTableValidationTestCase extends QueryTest with BeforeAndAfterAll {
@@ -15,35 +16,55 @@ class AlterTableValidationTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists restructure")
     // clean data folder
     CarbonProperties.getInstance()
-    sql("CREATE TABLE restructure (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
-    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE restructure OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
-    sql("CREATE TABLE restructure_test (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
-    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE restructure_test OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(
+      "CREATE TABLE restructure (empno int, empname String, designation String, doj Timestamp, " +
+      "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
+      "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
+      "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE restructure OPTIONS
+          |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin);
   }
 
   test("test add dictionary column") {
-    sql("alter table restructure add columns(dict int) TBLPROPERTIES ('DICTIONARY_INCLUDE'='dict', 'DEFAULT.VALUE.dict'= '9999')")
+    sql(
+      "alter table restructure add columns(dict int) TBLPROPERTIES ('DICTIONARY_INCLUDE'='dict', " +
+      "'DEFAULT.VALUE.dict'= '9999')")
     checkAnswer(sql("select distinct(dict) from restructure"), Row(9999))
   }
   test("test add no dictionary column") {
-    sql("alter table restructure add columns(nodict string) TBLPROPERTIES ('DICTIONARY_EXCLUDE'='nodict', 'DEFAULT.VALUE.NoDict'= 'abcd')")
+    sql(
+      "alter table restructure add columns(nodict string) TBLPROPERTIES " +
+      "('DICTIONARY_EXCLUDE'='nodict', 'DEFAULT.VALUE.NoDict'= 'abcd')")
     checkAnswer(sql("select distinct(nodict) from restructure"), Row("abcd"))
   }
   test("test add timestamp direct dictionary column") {
-    sql("alter table restructure add columns(tmpstmp timestamp) TBLPROPERTIES ('DEFAULT.VALUE.tmpstmp'= '17-01-2007')")
-    checkAnswer(sql("select distinct(tmpstmp) from restructure"), Row(new java.sql.Timestamp(107,0,17,0,0,0,0)))
+    sql(
+      "alter table restructure add columns(tmpstmp timestamp) TBLPROPERTIES ('DEFAULT.VALUE" +
+      ".tmpstmp'= '17-01-2007')")
+    checkAnswer(sql("select distinct(tmpstmp) from restructure"),
+      Row(new java.sql.Timestamp(107, 0, 17, 0, 0, 0, 0)))
     checkExistence(sql("desc restructure"), true, "tmpstmptimestamp")
   }
   test("test add msr column") {
-    sql("alter table restructure add columns(msrField decimal(5,2))TBLPROPERTIES ('DEFAULT.VALUE.msrfield'= '12345.11')")
+    sql(
+      "alter table restructure add columns(msrField decimal(5,2))TBLPROPERTIES ('DEFAULT.VALUE" +
+      ".msrfield'= '123.45')")
     checkExistence(sql("desc restructure"), true, "msrfielddecimal(5,2)")
+    val output = sql("select msrField from restructure").collect
+    checkAnswer(sql("select distinct(msrField) from restructure"),
+      Row(new BigDecimal("123.45").setScale(2, RoundingMode.HALF_UP)))
   }
 
   test("test add all datatype supported dictionary column") {
-    sql("alter table restructure add columns(strfld string, datefld date, tptfld timestamp, shortFld smallInt, " +
-        "intFld int, longFld bigint, dblFld double,dcml decimal(5,4))TBLPROPERTIES" +
-        "('DICTIONARY_INCLUDE'='datefld,shortFld,intFld,longFld,dblFld,dcml', 'DEFAULT.VALUE.dblFld'= '12345')")
-    checkAnswer(sql("select distinct(dblFld) from restructure"), Row(java.lang.Double.parseDouble("12345")))
+    sql(
+      "alter table restructure add columns(strfld string, datefld date, tptfld timestamp, " +
+      "shortFld smallInt, " +
+      "intFld int, longFld bigint, dblFld double,dcml decimal(5,4))TBLPROPERTIES" +
+      "('DICTIONARY_INCLUDE'='datefld,shortFld,intFld,longFld,dblFld,dcml', 'DEFAULT.VALUE" +
+      ".dblFld'= '12345')")
+    checkAnswer(sql("select distinct(dblFld) from restructure"),
+      Row(java.lang.Double.parseDouble("12345")))
     checkExistence(sql("desc restructure"), true, "strfldstring")
     checkExistence(sql("desc restructure"), true, "dateflddate")
     checkExistence(sql("desc restructure"), true, "tptfldtimestamp")
@@ -58,11 +79,10 @@ class AlterTableValidationTestCase extends QueryTest with BeforeAndAfterAll {
     sql("alter table restructure add columns(dcmlfld decimal(5,4))")
     try {
       sql("alter table restructure add columns(dcmlfld string)")
-      assert(false)
+      sys.error("Exception should be thrown as dcmlfld is already exist as measure")
     } catch {
       case e: Exception =>
         println(e.getMessage)
-        assert(true)
     }
   }
 
@@ -70,82 +90,206 @@ class AlterTableValidationTestCase extends QueryTest with BeforeAndAfterAll {
     sql("alter table restructure add columns(dimfld string)")
     try {
       sql("alter table restructure add columns(dimfld decimal(5,4))")
-      assert(false)
+      sys.error("Exception should be thrown as dimfld is already exist as dimension")
     } catch {
       case e: Exception =>
         println(e.getMessage)
-        assert(true)
     }
   }
 
   test("test adding existing column again") {
     sql("alter table restructure add columns(dimfld1 string, msrCol double)")
     try {
-      sql("alter table restructure add columns(dimfld1 int)TBLPROPERTIES('DICTIONARY_INCLUDE'='dimfld1')")
-      assert(false)
+      sql(
+        "alter table restructure add columns(dimfld1 int)TBLPROPERTIES" +
+        "('DICTIONARY_INCLUDE'='dimfld1')")
+      sys.error("Exception should be thrown as dimfld1 is already exist")
     } catch {
       case e: Exception =>
         println(e.getMessage)
         try {
           sql("alter table restructure add columns(msrCol decimal(5,3))")
-          assert(false)
+          sys.error("Exception should be thrown as msrCol is already exist")
         } catch {
           case e: Exception =>
             println(e.getMessage)
-            assert(true)
         }
     }
   }
 
   test("test adding no dictionary column with numeric type") {
     try {
-      sql("alter table restructure add columns(dimfld2 double) TBLPROPERTIES('DICTIONARY_EXCLUDE'='dimfld2')")
-      assert(false)
+      sql(
+        "alter table restructure add columns(dimfld2 double) TBLPROPERTIES" +
+        "('DICTIONARY_EXCLUDE'='dimfld2')")
+      sys.error("Exception should be thrown as msrCol is already exist")
     } catch {
       case e: Exception =>
         println(e.getMessage)
-        assert(true)
     }
   }
 
-  test("test to rename table") {
-    sql("alter table restructure_test rename to restructure_new")
-    val result = sql("select * from restructure_new")
-    assert(result.count().equals(10L))
+  test("test adding complex datatype column") {
+    try {
+      sql("alter table restructure add columns(arr array<string>)")
+      sys.error("Exception should be thrown for complex column add")
+    } catch {
+      case e: Exception =>
+        println(e.getMessage)
+    }
+  }
+
+  test("test drop and add same column with different datatype and default value") {
+    sql("alter table restructure drop columns(empname)")
+    sql(
+      "alter table restructure add columns(empname int) TBLPROPERTIES" +
+      "('DICTIONARY_INCLUDE'='empname', 'DEFAULT.VALUE.empname'='12345')")
+    checkAnswer(sql("select distinct(empname) from restructure"), Row(12345))
+    checkAnswer(sql("select count(empname) from restructure"), Row(10))
+  }
+
+  test("test drop column and select query on dropped column should fail") {
+    sql("alter table restructure drop columns(empname)")
+    try {
+      sql("select distinct(empname) from restructure")
+      sys.error("Exception should be thrown as selecting dropped column")
+    } catch {
+      case e: Exception =>
+        println(e.getMessage)
+    }
+    sql(
+      "alter table restructure add columns(empname string) TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='empname', 'DEFAULT.VALUE.empname'='testuser')")
+    checkAnswer(sql("select distinct(empname) from restructure"), Row("testuser"))
+    checkAnswer(sql("select count(empname) from restructure"), Row(10))
+  }
+
+  test("test add duplicate column names") {
+    try {
+      sql("alter table restructure add columns(newField string, newField int)")
+      sys.error("Exception should be thrown for duplicate column add")
+    } catch {
+      case e: Exception =>
+        println(e.getMessage)
+    }
+  }
+
+  test("test drop duplicate column names") {
+    try {
+      sql("alter table restructure drop columns(empname, empname)")
+      sys.error("Exception should be thrown for duplicate column drop")
+    } catch {
+      case e: Exception =>
+        println(e.getMessage)
+    }
+  }
+
+  test("test dropping non-existing column") {
+    try {
+      sql("alter table restructure drop columns(abcd)")
+      sys.error("Exception should be thrown for non-existing column drop")
+    } catch {
+      case e: Exception =>
+        println(e.getMessage)
+    }
+  }
+
+  test("test drop dimension, measure column") {
+    sql("alter table default.restructure drop columns(empno, designation, doj)")
+    checkExistence(sql("desc restructure"), false, "empnoint")
+    checkExistence(sql("desc restructure"), false, "designationstring")
+    checkExistence(sql("desc restructure"), false, "dojtimestamp")
+    assert(sql("select * from restructure").schema
+             .filter(p => p.name.equalsIgnoreCase("empno") ||
+                          p.name.equalsIgnoreCase("designation") || p.name.equalsIgnoreCase("doj"))
+             .size == 0)
+    sql("alter table restructure add columns(empno int, designation string, doj timestamp)")
+  }
+
+  test("test drop & add same column multiple times as dict, nodict, timestamp and msr") {
+    // drop and add dict column
+    sql("alter table restructure drop columns(designation)")
+    sql(
+      "alter table default.restructure add columns(designation int) TBLPROPERTIES" +
+      "('DICTIONARY_INCLUDE'='designation', 'DEFAULT.VALUE.designation'='12345')")
+    checkAnswer(sql("select distinct(designation) from restructure"), Row(12345))
+    // drop and add nodict column
+    sql("alter table restructure drop columns(designation)")
+    sql(
+      "alter table restructure add columns(designation string) TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='designation', 'DEFAULT.VALUE.designation'='abcd')")
+    checkAnswer(sql("select distinct(designation) from restructure"), Row("abcd"))
+    // drop and add directdict column
+    sql("alter table restructure drop columns(designation)")
+    sql(
+      "alter table restructure add columns(designation timestamp) TBLPROPERTIES ('DEFAULT.VALUE" +
+      ".designation'= '17-01-2007')")
+    checkAnswer(sql("select distinct(designation) from restructure"),
+      Row(new java.sql.Timestamp(107, 0, 17, 0, 0, 0, 0)))
+    // drop and add msr column
+    sql("alter table restructure drop columns(designation)")
+    sql(
+      "alter table default.restructure add columns(designation int) TBLPROPERTIES" +
+      "('DEFAULT.VALUE.designation'='67890')")
+    checkAnswer(sql("select distinct(designation) from restructure"), Row(67890))
   }
 
-  test("test to rename table with invalid table name") {
+  test("test change datatype of int and decimal column") {
+    sql("alter table restructure add columns(intfield int, decimalfield decimal(10,2))")
+    sql("alter table default.restructure change intfield intField bigint")
+    checkExistence(sql("desc restructure"), true, "intfieldbigint")
+    sql("alter table default.restructure change decimalfield deciMalfield Decimal(11,3)")
+  }
+
+  test("test change datatype of string to int column") {
     try {
-      sql("alter table restructure_invalid rename to restructure_new")
-      assert(false)
-    } catch{
-      case e:Exception =>
+      sql("alter table restructure change empname empname bigint")
+      sys.error("Exception should be thrown as empname string type change to bigint")
+    } catch {
+      case e: Exception =>
         println(e.getMessage)
-        assert(true)
     }
   }
 
-  test("test to rename table with table already exists") {
+  test("test change datatype of int to string column") {
     try {
-      sql("alter table restructure rename to restructure")
-      assert(false)
+      sql("alter table restructure change empno Empno string")
+      sys.error("Exception should be thrown as empno int type change to string")
     } catch {
-      case e:Exception =>
+      case e: Exception =>
         println(e.getMessage)
-        assert(true)
     }
   }
 
-  test("test to load data after rename") {
-    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE restructure_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
-    val result = sql("select * from restructure_new")
-    assert(result.count().equals(20L))
+  test("test change datatype of non-existing column") {
+    try {
+      sql("alter table restructure change abcd abcd string")
+      sys.error("Exception should be thrown for datatype change on non-existing column")
+    } catch {
+      case e: Exception =>
+        println(e.getMessage)
+    }
+  }
+
+  test("test change datatype of decimal column from higher to lower precision/scale") {
+    sql("alter table restructure add columns(decField decimal(10,2))")
+    try {
+      sql("alter table restructure change decField decField decimal(10,1)")
+      sys.error("Exception should be thrown for downgrade of scale in decimal type")
+    } catch {
+      case e: Exception =>
+        println(e.getMessage)
+    }
+    try {
+      sql("alter table restructure change decField decField decimal(5,3)")
+      sys.error("Exception should be thrown for downgrade of precision in decimal type")
+    } catch {
+      case e: Exception =>
+        println(e.getMessage)
+    }
   }
 
   override def afterAll {
     sql("DROP TABLE IF EXISTS restructure")
-    sql("DROP TABLE IF EXISTS restructure_new")
-    sql("DROP TABLE IF EXISTS restructure_test")
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/AddColumnTestCases.scala
new file mode 100644
index 0000000..6ffa2d0
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/AddColumnTestCases.scala
@@ -0,0 +1,121 @@
+package org.apache.spark.carbondata.restructure.rowreader
+
+import java.math.{BigDecimal, RoundingMode}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class AddColumnTestCases extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sqlContext.setConf("carbon.enable.vector.reader", "false")
+    sql("DROP TABLE IF EXISTS addcolumntest")
+    sql("drop table if exists hivetable")
+    sql(
+      "CREATE TABLE addcolumntest(intField int,stringField string,timestampField timestamp," +
+      "decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE addcolumntest " +
+        s"options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+    sql(
+      "Alter table addcolumntest add columns(charField string) TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='charField', 'DEFAULT.VALUE.charfield'='def')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE addcolumntest " +
+        s"options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("CREATE TABLE hivetable stored as parquet select * from addcolumntest")
+  }
+
+  test("test like query on new column") {
+    checkAnswer(sql("select charField from addcolumntest where charField like 'd%'"), Row("def"))
+  }
+
+  test("test is not null filter on new column") {
+    checkAnswer(sql("select charField from addcolumntest where charField is not null"),
+      Seq(Row("abc"), Row("def")))
+  }
+
+  test("test is null filter on new column") {
+    checkAnswer(sql("select charField from addcolumntest where charField is null"), Seq())
+  }
+
+  test("test equals filter on new column") {
+    checkAnswer(sql("select charField from addcolumntest where charField = 'abc'"), Row("abc"))
+  }
+
+  test("test add dictionary column and test greaterthan/lessthan filter on new column") {
+    sql(
+      "Alter table addcolumntest add columns(intnewField int) TBLPROPERTIES" +
+      "('DICTIONARY_INCLUDE'='intnewField', 'DEFAULT.VALUE.intNewField'='5')")
+    checkAnswer(sql("select charField from addcolumntest where intnewField > 2"),
+      Seq(Row("abc"), Row("def")))
+    checkAnswer(sql("select charField from addcolumntest where intnewField < 2"), Seq())
+  }
+
+  test("test compaction after adding new column") {
+    sql("Alter table addcolumntest compact 'major'")
+    checkExistence(sql("show segments for table addcolumntest"), true, "0Compacted")
+    checkExistence(sql("show segments for table addcolumntest"), true, "1Compacted")
+    checkExistence(sql("show segments for table addcolumntest"), true, "0.1Success")
+    checkAnswer(sql("select charField from addcolumntest"), Seq(Row("abc"), Row("def")))
+  }
+
+  test("test add msr column and check aggregate") {
+    sql(
+      "alter table addcolumntest add columns(msrField decimal(5,2))TBLPROPERTIES ('DEFAULT.VALUE" +
+      ".msrfield'= '123.45')")
+    checkAnswer(sql("select sum(msrField) from addcolumntest"),
+      Row(new BigDecimal("246.90").setScale(2, RoundingMode.HALF_UP)))
+  }
+
+  test("test join on new column") {
+    checkAnswer(sql(
+      "select t1.charField, t2.charField from addcolumntest t1, hivetable t2 where t1.charField =" +
+      " t2.charField"),
+      Seq(Row("abc", "abc"), Row("def", "def")))
+  }
+
+  test("test add and drop column with data loading") {
+    sql("DROP TABLE IF EXISTS carbon_table")
+    sql(
+      "CREATE TABLE carbon_table(intField int,stringField string,charField string,timestampField " +
+      "timestamp,decimalField decimal(6,2))STORED BY 'carbondata' TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='charField')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table carbon_table drop columns(timestampField)")
+    sql("select * from carbon_table").collect
+    sql("Alter table carbon_table add columns(timestampField timestamp)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data5.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,charField,decimalField,timestampField')")
+    sql("DROP TABLE IF EXISTS carbon_table")
+  }
+
+  test("test add/drop and change datatype") {
+    sql("DROP TABLE IF EXISTS carbon_table")
+    sql(
+      "CREATE TABLE carbon_table(intField int,stringField string,charField string,timestampField " +
+      "timestamp,decimalField decimal(6,2))STORED BY 'carbondata' TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='charField')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table carbon_table drop columns(charField)")
+    sql("select * from carbon_table").collect
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+    sql(
+      "Alter table carbon_table add columns(charField string) TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='charField')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data2.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,timestampField,decimalField,charField')")
+    sql("select * from carbon_table").collect
+    sql("ALTER TABLE carbon_table CHANGE decimalField decimalField decimal(22,6)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data3.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,timestampField,decimalField,charField')")
+    sql("DROP TABLE IF EXISTS carbon_table")
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS addcolumntest")
+    sql("drop table if exists hivetable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/ChangeDataTypeTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/ChangeDataTypeTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/ChangeDataTypeTestCases.scala
new file mode 100644
index 0000000..369d002
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/ChangeDataTypeTestCases.scala
@@ -0,0 +1,95 @@
+package org.apache.spark.carbondata.restructure.rowreader
+
+import java.math.BigDecimal
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class ChangeDataTypeTestCases extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sqlContext.setConf("carbon.enable.vector.reader", "false")
+    sql("DROP TABLE IF EXISTS changedatatypetest")
+    sql("drop table if exists hivetable")
+  }
+
+  test("test change datatype on existing column and load data, insert into hive table") {
+    beforeAll
+    sql(
+      "CREATE TABLE changedatatypetest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    sql("Alter table changedatatypetest change intField intfield bigint")
+    sql(
+      "CREATE TABLE hivetable(intField bigint,stringField string,charField string,timestampField " +
+      "timestamp,decimalField decimal(6,2)) stored as parquet")
+    sql("insert into table hivetable select * from changedatatypetest")
+    afterAll
+  }
+
+  test("test datatype change and filter") {
+    beforeAll
+    sql(
+      "CREATE TABLE changedatatypetest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    sql("Alter table changedatatypetest change intField intfield bigint")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    checkAnswer(sql("select charField from changedatatypetest where intField > 99"),
+      Seq(Row("abc"), Row("abc")))
+    checkAnswer(sql("select charField from changedatatypetest where intField < 99"), Seq())
+    checkAnswer(sql("select charField from changedatatypetest where intField = 100"),
+      Seq(Row("abc"), Row("abc")))
+    afterAll
+  }
+
+
+  test("test change int datatype and load data") {
+    beforeAll
+    sql(
+      "CREATE TABLE changedatatypetest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    sql("Alter table changedatatypetest change intField intfield bigint")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    checkAnswer(sql("select sum(intField) from changedatatypetest"), Row(200))
+    afterAll
+  }
+
+  test("test change decimal datatype and compaction") {
+    beforeAll
+    sql(
+      "CREATE TABLE changedatatypetest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    sql("Alter table changedatatypetest change decimalField decimalField decimal(9,5)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    checkAnswer(sql("select decimalField from changedatatypetest"),
+      Seq(Row(new BigDecimal("21.23").setScale(5)), Row(new BigDecimal("21.23").setScale(5))))
+    sql("alter table changedatatypetest compact 'major'")
+    checkExistence(sql("show segments for table changedatatypetest"), true, "0Compacted")
+    checkExistence(sql("show segments for table changedatatypetest"), true, "1Compacted")
+    checkExistence(sql("show segments for table changedatatypetest"), true, "0.1Success")
+    afterAll
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS changedatatypetest")
+    sql("drop table if exists hivetable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/DropColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/DropColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/DropColumnTestCases.scala
new file mode 100644
index 0000000..d659cb3
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/DropColumnTestCases.scala
@@ -0,0 +1,70 @@
+package org.apache.spark.carbondata.restructure.rowreader
+
+import java.math.{BigDecimal, RoundingMode}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.util.CarbonProperties
+
+class DropColumnTestCases extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sqlContext.setConf("carbon.enable.vector.reader", "false")
+    sql("DROP TABLE IF EXISTS dropcolumntest")
+    sql("drop table if exists hivetable")
+  }
+
+  test("test drop column and insert into hive table") {
+    beforeAll
+    sql(
+      "CREATE TABLE dropcolumntest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table dropcolumntest drop columns(charField)")
+    sql(
+      "CREATE TABLE hivetable(intField int,stringField string,timestampField timestamp," +
+      "decimalField decimal(6,2)) stored as parquet")
+    sql("insert into table hivetable select * from dropcolumntest")
+    checkAnswer(sql("select * from hivetable"), sql("select * from dropcolumntest"))
+    afterAll
+  }
+
+  test("test drop column and load data") {
+    beforeAll
+    sql(
+      "CREATE TABLE dropcolumntest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table dropcolumntest drop columns(charField)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+    checkAnswer(sql("select count(*) from dropcolumntest"), Row(2))
+    afterAll
+  }
+
+  test("test drop column and compaction") {
+    beforeAll
+    sql(
+      "CREATE TABLE dropcolumntest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table dropcolumntest drop columns(charField)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+    sql("alter table dropcolumntest compact 'major'")
+    checkExistence(sql("show segments for table dropcolumntest"), true, "0Compacted")
+    checkExistence(sql("show segments for table dropcolumntest"), true, "1Compacted")
+    checkExistence(sql("show segments for table dropcolumntest"), true, "0.1Success")
+    afterAll
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS dropcolumntest")
+    sql("drop table if exists hivetable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
new file mode 100644
index 0000000..13003c7
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -0,0 +1,114 @@
+package org.apache.spark.carbondata.restructure.vectorreader
+
+import java.math.{BigDecimal, RoundingMode}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class AddColumnTestCases extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sqlContext.setConf("carbon.enable.vector.reader", "true")
+    sql("DROP TABLE IF EXISTS addcolumntest")
+    sql("drop table if exists hivetable")
+    sql(
+      "CREATE TABLE addcolumntest(intField int,stringField string,timestampField timestamp," +
+      "decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE addcolumntest " +
+        s"options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+    sql(
+      "Alter table addcolumntest add columns(charField string) TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='charField', 'DEFAULT.VALUE.charfield'='def')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE addcolumntest " +
+        s"options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+  }
+
+  test("test like query on new column") {
+    checkAnswer(sql("select charField from addcolumntest where charField like 'd%'"), Row("def"))
+  }
+
+  test("test is not null filter on new column") {
+    checkAnswer(sql("select charField from addcolumntest where charField is not null"),
+      Seq(Row("abc"), Row("def")))
+  }
+
+  test("test is null filter on new column") {
+    checkAnswer(sql("select charField from addcolumntest where charField is null"), Seq())
+  }
+
+  test("test equals filter on new column") {
+    checkAnswer(sql("select charField from addcolumntest where charField = 'abc'"), Row("abc"))
+  }
+
+  test("test add dictionary column and test greaterthan/lessthan filter on new column") {
+    sql(
+      "Alter table addcolumntest add columns(intnewField int) TBLPROPERTIES" +
+      "('DICTIONARY_INCLUDE'='intnewField', 'DEFAULT.VALUE.intNewField'='5')")
+    checkAnswer(sql("select charField from addcolumntest where intnewField > 2"),
+      Seq(Row("abc"), Row("def")))
+    checkAnswer(sql("select charField from addcolumntest where intnewField < 2"), Seq())
+  }
+
+  test("test add msr column and check aggregate") {
+    sql(
+      "alter table addcolumntest add columns(msrField decimal(5,2))TBLPROPERTIES ('DEFAULT.VALUE" +
+      ".msrfield'= '123.45')")
+    checkAnswer(sql("select sum(msrField) from addcolumntest"),
+      Row(new BigDecimal("246.90").setScale(2, RoundingMode.HALF_UP)))
+  }
+
+  test("test compaction after adding new column") {
+    sql("Alter table addcolumntest compact 'major'")
+    checkExistence(sql("show segments for table addcolumntest"), true, "0Compacted")
+    checkExistence(sql("show segments for table addcolumntest"), true, "1Compacted")
+    checkExistence(sql("show segments for table addcolumntest"), true, "0.1Success")
+    checkAnswer(sql("select charField from addcolumntest"), Seq(Row("abc"), Row("def")))
+  }
+
+  test("test add and drop column with data loading") {
+    sql("DROP TABLE IF EXISTS carbon_table")
+    sql(
+      "CREATE TABLE carbon_table(intField int,stringField string,charField string,timestampField " +
+      "timestamp,decimalField decimal(6,2))STORED BY 'carbondata' TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='charField')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table carbon_table drop columns(timestampField)")
+    sql("select * from carbon_table").collect
+    sql("Alter table carbon_table add columns(timestampField timestamp)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data5.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,charField,decimalField,timestampField')")
+    sql("DROP TABLE IF EXISTS carbon_table")
+  }
+
+  test("test add/drop and change datatype") {
+    sql("DROP TABLE IF EXISTS carbon_table")
+    sql(
+      "CREATE TABLE carbon_table(intField int,stringField string,charField string,timestampField " +
+      "timestamp,decimalField decimal(6,2))STORED BY 'carbondata' TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='charField')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table carbon_table drop columns(charField)")
+    sql("select * from carbon_table").collect
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+    sql(
+      "Alter table carbon_table add columns(charField string) TBLPROPERTIES" +
+      "('DICTIONARY_EXCLUDE'='charField')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data2.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,timestampField,decimalField,charField')")
+    sql("select * from carbon_table").collect
+    sql("ALTER TABLE carbon_table CHANGE decimalField decimalField decimal(22,6)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data3.csv' INTO TABLE carbon_table " +
+        s"options('FILEHEADER'='intField,stringField,timestampField,decimalField,charField')")
+    sql("DROP TABLE IF EXISTS carbon_table")
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS addcolumntest")
+    sql("drop table if exists hivetable")
+    sqlContext.setConf("carbon.enable.vector.reader", "false")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
new file mode 100644
index 0000000..ebf2ef0
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
@@ -0,0 +1,96 @@
+package org.apache.spark.carbondata.restructure.vectorreader
+
+import java.math.BigDecimal
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class ChangeDataTypeTestCases extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sqlContext.setConf("carbon.enable.vector.reader", "true")
+    sql("DROP TABLE IF EXISTS changedatatypetest")
+    sql("drop table if exists hivetable")
+  }
+
+  test("test change datatype on existing column and load data, insert into hive table") {
+    beforeAll
+    sql(
+      "CREATE TABLE changedatatypetest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    sql("Alter table changedatatypetest change intField intfield bigint")
+    sql(
+      "CREATE TABLE hivetable(intField bigint,stringField string,charField string,timestampField " +
+      "timestamp,decimalField decimal(6,2)) stored as parquet")
+    sql("insert into table hivetable select * from changedatatypetest")
+    afterAll
+  }
+
+  test("test datatype change and filter") {
+    beforeAll
+    sql(
+      "CREATE TABLE changedatatypetest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    sql("Alter table changedatatypetest change intField intfield bigint")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    checkAnswer(sql("select charField from changedatatypetest where intField > 99"),
+      Seq(Row("abc"), Row("abc")))
+    checkAnswer(sql("select charField from changedatatypetest where intField < 99"), Seq())
+    checkAnswer(sql("select charField from changedatatypetest where intField = 100"),
+      Seq(Row("abc"), Row("abc")))
+    afterAll
+  }
+
+
+  test("test change int datatype and load data") {
+    beforeAll
+    sql(
+      "CREATE TABLE changedatatypetest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    sql("Alter table changedatatypetest change intField intfield bigint")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    checkAnswer(sql("select sum(intField) from changedatatypetest"), Row(200))
+    afterAll
+  }
+
+  test("test change decimal datatype and compaction") {
+    beforeAll
+    sql(
+      "CREATE TABLE changedatatypetest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    sql("Alter table changedatatypetest change decimalField decimalField decimal(9,5)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE " +
+        s"changedatatypetest options('FILEHEADER'='intField,stringField,charField,timestampField," +
+        s"decimalField')")
+    checkAnswer(sql("select decimalField from changedatatypetest"),
+      Seq(Row(new BigDecimal("21.23").setScale(5)), Row(new BigDecimal("21.23").setScale(5))))
+    sql("alter table changedatatypetest compact 'major'")
+    checkExistence(sql("show segments for table changedatatypetest"), true, "0Compacted")
+    checkExistence(sql("show segments for table changedatatypetest"), true, "1Compacted")
+    checkExistence(sql("show segments for table changedatatypetest"), true, "0.1Success")
+    afterAll
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS changedatatypetest")
+    sql("drop table if exists hivetable")
+    sqlContext.setConf("carbon.enable.vector.reader", "false")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/35739e5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
new file mode 100644
index 0000000..ce64ffe
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
@@ -0,0 +1,71 @@
+package org.apache.spark.carbondata.restructure.vectorreader
+
+import java.math.{BigDecimal, RoundingMode}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.util.CarbonProperties
+
+class DropColumnTestCases extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sqlContext.setConf("carbon.enable.vector.reader", "true")
+    sql("DROP TABLE IF EXISTS dropcolumntest")
+    sql("drop table if exists hivetable")
+  }
+
+  test("test drop column and insert into hive table") {
+    beforeAll
+    sql(
+      "CREATE TABLE dropcolumntest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table dropcolumntest drop columns(charField)")
+    sql(
+      "CREATE TABLE hivetable(intField int,stringField string,timestampField timestamp," +
+      "decimalField decimal(6,2)) stored as parquet")
+    sql("insert into table hivetable select * from dropcolumntest")
+    checkAnswer(sql("select * from hivetable"), sql("select * from dropcolumntest"))
+    afterAll
+  }
+
+  test("test drop column and load data") {
+    beforeAll
+    sql(
+      "CREATE TABLE dropcolumntest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table dropcolumntest drop columns(charField)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+    checkAnswer(sql("select count(*) from dropcolumntest"), Row(2))
+    afterAll
+  }
+
+  test("test drop column and compaction") {
+    beforeAll
+    sql(
+      "CREATE TABLE dropcolumntest(intField int,stringField string,charField string," +
+      "timestampField timestamp,decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data1.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,charField,timestampField,decimalField')")
+    sql("Alter table dropcolumntest drop columns(charField)")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE dropcolumntest" +
+        s" options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+    sql("alter table dropcolumntest compact 'major'")
+    checkExistence(sql("show segments for table dropcolumntest"), true, "0Compacted")
+    checkExistence(sql("show segments for table dropcolumntest"), true, "1Compacted")
+    checkExistence(sql("show segments for table dropcolumntest"), true, "0.1Success")
+    afterAll
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS dropcolumntest")
+    sql("drop table if exists hivetable")
+    sqlContext.setConf("carbon.enable.vector.reader", "false")
+  }
+}