You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/16 19:05:41 UTC

[carbondata] 08/22: [CARBONDATA-3348] Support alter SORT_COLUMNS property

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit d1b455f09590b48a4ba3709fa29635a18da1d790
Author: QiangCai <qi...@qq.com>
AuthorDate: Tue Apr 16 20:27:31 2019 +0800

    [CARBONDATA-3348] Support alter SORT_COLUMNS property
    
    Modification
    
    support alter SORT_COLUMNS
    alter table <table name> set tblproperties('sort_scope'='<sort scope type>', 'sort_columns'='[c1][,...cn ]')
    Limitation
    
    when a measure become a dimension and the query contain this column, the task distribution of this query will only support block and blocklet, but not merge_small_files or custom.
    
    This closes #3178
---
 .../core/constants/CarbonCommonConstants.java      |   5 +
 .../carbondata/core/datamap/DataMapFilter.java     |  89 ++++
 .../carbondata/core/datamap/TableDataMap.java      |  91 ++--
 .../datamap/dev/expr/DataMapExprWrapperImpl.java   |   3 +-
 .../core/metadata/schema/table/CarbonTable.java    |  20 +
 .../core/metadata/schema/table/TableInfo.java      |  23 +
 .../scan/executor/impl/AbstractQueryExecutor.java  |  62 +--
 .../executor/impl/QueryExecutorProperties.java     |   5 -
 .../core/scan/executor/util/RestructureUtil.java   |  75 ++-
 .../core/scan/model/QueryModelBuilder.java         |   2 +-
 .../scan/executor/util/RestructureUtilTest.java    |  11 +-
 .../carbondata/hadoop/api/CarbonInputFormat.java   |  29 +-
 .../test/resources/sort_columns/alldatatype1.csv   |  13 +
 .../test/resources/sort_columns/alldatatype2.csv   |  13 +
 .../TestAlterTableSortColumnsProperty.scala        | 541 +++++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  10 +-
 .../apache/carbondata/spark/util/CommonUtil.scala  |  80 ++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |  31 +-
 .../org/apache/spark/util/AlterTableUtil.scala     | 126 ++++-
 19 files changed, 1039 insertions(+), 190 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c9efc34..608b5fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -478,6 +478,11 @@ public final class CarbonCommonConstants {
    */
   public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK";
 
+  /**
+   * column level property: the measure is changed to the dimension
+   */
+  public static final String COLUMN_DRIFT = "column_drift";
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Data loading parameter start here
   //////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
new file mode 100644
index 0000000..c20d0d5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * the filter of DataMap
+ */
+public class DataMapFilter implements Serializable {
+
+  private CarbonTable table;
+
+  private Expression expression;
+
+  private FilterResolverIntf resolver;
+
+  public DataMapFilter(CarbonTable table, Expression expression) {
+    this.table = table;
+    this.expression = expression;
+    resolve();
+  }
+
+  public DataMapFilter(FilterResolverIntf resolver) {
+    this.resolver = resolver;
+  }
+
+  private void resolve() {
+    if (expression != null) {
+      table.processFilterExpression(expression, null, null);
+      resolver = CarbonTable.resolveFilter(expression, table.getAbsoluteTableIdentifier());
+    }
+  }
+
+  public Expression getExpression() {
+    return expression;
+  }
+
+  public void setExpression(Expression expression) {
+    this.expression = expression;
+  }
+
+  public FilterResolverIntf getResolver() {
+    return resolver;
+  }
+
+  public void setResolver(FilterResolverIntf resolver) {
+    this.resolver = resolver;
+  }
+
+  public boolean isEmpty() {
+    return resolver == null;
+  }
+
+  public boolean isResolvedOnSegment(SegmentProperties segmentProperties) {
+    if (expression == null || table == null) {
+      return true;
+    }
+    if (!table.isTransactionalTable()) {
+      return false;
+    }
+    if (table.hasColumnDrift() && RestructureUtil
+        .hasColumnDriftOnSegment(table, segmentProperties)) {
+      return false;
+    }
+    return true;
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index f9020bd..4375abb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -47,7 +47,6 @@ import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.events.Event;
@@ -100,38 +99,6 @@ public final class TableDataMap extends OperationEventListener {
     return blockletDetailsFetcher;
   }
 
-
-  /**
-   * Pass the valid segments and prune the datamap using filter expression
-   *
-   * @param segments
-   * @param filterExp
-   * @return
-   */
-  public List<ExtendedBlocklet> prune(List<Segment> segments, Expression filterExp,
-      List<PartitionSpec> partitions) throws IOException {
-    List<ExtendedBlocklet> blocklets = new ArrayList<>();
-    SegmentProperties segmentProperties;
-    Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
-    for (Segment segment : segments) {
-      List<Blocklet> pruneBlocklets = new ArrayList<>();
-      // if filter is not passed then return all the blocklets
-      if (filterExp == null) {
-        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
-      } else {
-        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
-        for (DataMap dataMap : dataMaps.get(segment)) {
-          pruneBlocklets.addAll(dataMap
-              .prune(filterExp, segmentProperties, partitions, table));
-        }
-      }
-      blocklets.addAll(addSegmentId(
-          blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
-          segment));
-    }
-    return blocklets;
-  }
-
   public CarbonTable getTable() {
     return table;
   }
@@ -140,10 +107,10 @@ public final class TableDataMap extends OperationEventListener {
    * Pass the valid segments and prune the datamap using filter expression
    *
    * @param segments
-   * @param filterExp
+   * @param filter
    * @return
    */
-  public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolverIntf filterExp,
+  public List<ExtendedBlocklet> prune(List<Segment> segments, final DataMapFilter filter,
       final List<PartitionSpec> partitions) throws IOException {
     final List<ExtendedBlocklet> blocklets = new ArrayList<>();
     final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
@@ -164,15 +131,15 @@ public final class TableDataMap extends OperationEventListener {
       // As 0.1 million files block pruning can take only 1 second.
       // Doing multi-thread for smaller values is not recommended as
       // driver should have minimum threads opened to support multiple concurrent queries.
-      if (filterExp == null) {
+      if (filter.isEmpty()) {
         // if filter is not passed, then return all the blocklets.
         return pruneWithoutFilter(segments, partitions, blocklets);
       }
-      return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps);
+      return pruneWithFilter(segments, filter, partitions, blocklets, dataMaps);
     }
     // handle by multi-thread
-    List<ExtendedBlocklet> extendedBlocklets =
-        pruneMultiThread(segments, filterExp, partitions, blocklets, dataMaps, totalFiles);
+    List<ExtendedBlocklet> extendedBlocklets = pruneMultiThread(
+        segments, filter, partitions, blocklets, dataMaps, totalFiles);
     return extendedBlocklets;
   }
 
@@ -187,14 +154,22 @@ public final class TableDataMap extends OperationEventListener {
     return blocklets;
   }
 
-  private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
-      FilterResolverIntf filterExp, List<PartitionSpec> partitions,
-      List<ExtendedBlocklet> blocklets, Map<Segment, List<DataMap>> dataMaps) throws IOException {
+  private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments, DataMapFilter filter,
+      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets,
+      Map<Segment, List<DataMap>> dataMaps) throws IOException {
     for (Segment segment : segments) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
       SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
-      for (DataMap dataMap : dataMaps.get(segment)) {
-        pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+      if (filter.isResolvedOnSegment(segmentProperties)) {
+        for (DataMap dataMap : dataMaps.get(segment)) {
+          pruneBlocklets.addAll(
+              dataMap.prune(filter.getResolver(), segmentProperties, partitions));
+        }
+      } else {
+        for (DataMap dataMap : dataMaps.get(segment)) {
+          pruneBlocklets.addAll(
+              dataMap.prune(filter.getExpression(), segmentProperties, partitions, table));
+        }
       }
       blocklets.addAll(
           addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
@@ -204,7 +179,7 @@ public final class TableDataMap extends OperationEventListener {
   }
 
   private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
-      final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
+      final DataMapFilter filter, final List<PartitionSpec> partitions,
       List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
       int totalFiles) {
     /*
@@ -295,14 +270,24 @@ public final class TableDataMap extends OperationEventListener {
             SegmentProperties segmentProperties =
                 segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0));
             Segment segment = segmentDataMapGroup.getSegment();
-            for (int i = segmentDataMapGroup.getFromIndex();
-                 i <= segmentDataMapGroup.getToIndex(); i++) {
-              List<Blocklet> dmPruneBlocklets  = dataMapList.get(i).prune(filterExp,
-                  segmentProperties,
-                  partitions);
-              pruneBlocklets.addAll(addSegmentId(blockletDetailsFetcher
-                      .getExtendedBlocklets(dmPruneBlocklets, segment),
-                  segment));
+            if (filter.isResolvedOnSegment(segmentProperties)) {
+              for (int i = segmentDataMapGroup.getFromIndex();
+                   i <= segmentDataMapGroup.getToIndex(); i++) {
+                List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
+                    filter.getResolver(), segmentProperties, partitions);
+                pruneBlocklets.addAll(addSegmentId(
+                    blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
+                    segment));
+              }
+            } else {
+              for (int i = segmentDataMapGroup.getFromIndex();
+                   i <= segmentDataMapGroup.getToIndex(); i++) {
+                List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
+                    filter.getExpression(), segmentProperties, partitions, table);
+                pruneBlocklets.addAll(addSegmentId(
+                    blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
+                    segment));
+              }
             }
             synchronized (prunedBlockletMap) {
               List<ExtendedBlocklet> pruneBlockletsExisting =
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index 4643b47..bb2662b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
@@ -50,7 +51,7 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
   @Override
   public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
       throws IOException {
-    return dataMap.prune(segments, expression, partitionsToPrune);
+    return dataMap.prune(segments, new DataMapFilter(expression), partitionsToPrune);
   }
 
   public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 3623147..54ea772 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -121,6 +121,11 @@ public class CarbonTable implements Serializable {
   private List<CarbonMeasure> allMeasures;
 
   /**
+   * list of column drift
+   */
+  private List<CarbonDimension> columnDrift;
+
+  /**
    * table bucket map.
    */
   private Map<String, BucketingInfo> tableBucketMap;
@@ -189,6 +194,7 @@ public class CarbonTable implements Serializable {
     this.tablePartitionMap = new HashMap<>();
     this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
     this.tablePrimitiveDimensionsMap = new HashMap<String, List<CarbonDimension>>();
+    this.columnDrift = new ArrayList<CarbonDimension>();
   }
 
   /**
@@ -898,6 +904,12 @@ public class CarbonTable implements Serializable {
     for (CarbonDimension dimension : allDimensions) {
       if (!dimension.isInvisible()) {
         visibleDimensions.add(dimension);
+        Map<String, String> columnProperties = dimension.getColumnProperties();
+        if (columnProperties != null) {
+          if (columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT) != null) {
+            columnDrift.add(dimension);
+          }
+        }
       }
     }
     tableDimensionsMap.put(tableName, visibleDimensions);
@@ -912,6 +924,14 @@ public class CarbonTable implements Serializable {
     return allMeasures;
   }
 
+  public List<CarbonDimension> getColumnDrift() {
+    return columnDrift;
+  }
+
+  public boolean hasColumnDrift() {
+    return tableInfo.hasColumnDrift();
+  }
+
   /**
    * This method will all the visible allMeasures
    *
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index daba29b..ec9d311 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -91,6 +91,8 @@ public class TableInfo implements Serializable, Writable {
    */
   private boolean isTransactionalTable = true;
 
+  private boolean hasColumnDrift = false;
+
   // this identifier is a lazy field which will be created when it is used first time
   private AbsoluteTableIdentifier identifier;
 
@@ -122,6 +124,7 @@ public class TableInfo implements Serializable, Writable {
     this.factTable = factTable;
     updateParentRelationIdentifier();
     updateIsSchemaModified();
+    updateHasColumnDrift();
   }
 
   private void updateIsSchemaModified() {
@@ -276,6 +279,7 @@ public class TableInfo implements Serializable, Writable {
     out.writeLong(lastUpdatedTime);
     out.writeUTF(getOrCreateAbsoluteTableIdentifier().getTablePath());
     out.writeBoolean(isTransactionalTable);
+    out.writeBoolean(hasColumnDrift);
     boolean isChildSchemaExists =
         null != dataMapSchemaList && dataMapSchemaList.size() > 0;
     out.writeBoolean(isChildSchemaExists);
@@ -305,6 +309,7 @@ public class TableInfo implements Serializable, Writable {
     this.lastUpdatedTime = in.readLong();
     this.tablePath = in.readUTF();
     this.isTransactionalTable = in.readBoolean();
+    this.hasColumnDrift = in.readBoolean();
     boolean isChildSchemaExists = in.readBoolean();
     this.dataMapSchemaList = new ArrayList<>();
     if (isChildSchemaExists) {
@@ -371,4 +376,22 @@ public class TableInfo implements Serializable, Writable {
     return isSchemaModified;
   }
 
+  private void updateHasColumnDrift() {
+    this.hasColumnDrift = false;
+    for (ColumnSchema columnSchema : factTable.getListOfColumns()) {
+      if (columnSchema.isDimensionColumn() && !columnSchema.isInvisible()) {
+        Map<String, String> columnProperties = columnSchema.getColumnProperties();
+        if (columnProperties != null) {
+          if (columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT) != null) {
+            this.hasColumnDrift = true;
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  public boolean hasColumnDrift() {
+    return hasColumnDrift;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index b15bdb5..f06f5c3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -46,7 +46,6 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -139,20 +138,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     queryStatistic
         .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
     queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
-    // calculating the total number of aggregated columns
-    int measureCount = queryModel.getProjectionMeasures().size();
-
-    int currentIndex = 0;
-    DataType[] dataTypes = new DataType[measureCount];
-
-    for (ProjectionMeasure carbonMeasure : queryModel.getProjectionMeasures()) {
-      // adding the data type and aggregation type of all the measure this
-      // can be used
-      // to select the aggregator
-      dataTypes[currentIndex] = carbonMeasure.getMeasure().getDataType();
-      currentIndex++;
-    }
-    queryProperties.measureDataTypes = dataTypes;
+
     // as aggregation will be executed in following order
     // 1.aggregate dimension expression
     // 2. expression
@@ -461,14 +447,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
       throws QueryExecutionException {
     BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
     SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
-    List<CarbonDimension> tableBlockDimensions = segmentProperties.getDimensions();
-
+    // set actual query dimensions and measures. It may differ in case of restructure scenarios
+    RestructureUtil.actualProjectionOfSegment(blockExecutionInfo, queryModel, segmentProperties);
     // below is to get only those dimension in query which is present in the
     // table block
     List<ProjectionDimension> projectDimensions = RestructureUtil
         .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
-            queryModel.getProjectionDimensions(), tableBlockDimensions,
-            segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(),
+            blockExecutionInfo.getActualQueryDimensions(), segmentProperties.getDimensions(),
+            segmentProperties.getComplexDimensions(),
+            blockExecutionInfo.getActualQueryMeasures().length,
             queryModel.getTable().getTableInfo().isTransactionalTable());
     boolean isStandardTable = CarbonUtil.isStandardCarbonTable(queryModel.getTable());
     String blockId = CarbonUtil
@@ -486,10 +473,12 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     blockExecutionInfo.setProjectionDimensions(projectDimensions
         .toArray(new ProjectionDimension[projectDimensions.size()]));
     // get measures present in the current block
-    List<ProjectionMeasure> currentBlockQueryMeasures =
-        getCurrentBlockQueryMeasures(blockExecutionInfo, queryModel, blockIndex);
+    List<ProjectionMeasure> projectionMeasures = RestructureUtil
+        .createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo,
+            blockExecutionInfo.getActualQueryMeasures(), segmentProperties.getMeasures(),
+            queryModel.getTable().getTableInfo().isTransactionalTable());
     blockExecutionInfo.setProjectionMeasures(
-        currentBlockQueryMeasures.toArray(new ProjectionMeasure[currentBlockQueryMeasures.size()]));
+        projectionMeasures.toArray(new ProjectionMeasure[projectionMeasures.size()]));
     blockExecutionInfo.setDataBlock(blockIndex);
     // setting whether raw record query or not
     blockExecutionInfo.setRawRecordDetailQuery(queryModel.isForcedDetailRawQuery());
@@ -581,7 +570,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // list of measures to be projected
     List<Integer> allProjectionListMeasureIndexes = new ArrayList<>();
     int[] measureChunkIndexes = QueryUtil.getMeasureChunkIndexes(
-        currentBlockQueryMeasures, expressionMeasures,
+        projectionMeasures, expressionMeasures,
         segmentProperties.getMeasuresOrdinalToChunkMapping(), filterMeasures,
         allProjectionListMeasureIndexes);
     reusableBufferSize = Math.max(segmentProperties.getMeasuresOrdinalToChunkMapping().size(),
@@ -637,11 +626,6 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     blockExecutionInfo.setComplexColumnParentBlockIndexes(
         getComplexDimensionParentBlockIndexes(projectDimensions));
     blockExecutionInfo.setVectorBatchCollector(queryModel.isVectorReader());
-    // set actual query dimensions and measures. It may differ in case of restructure scenarios
-    blockExecutionInfo.setActualQueryDimensions(queryModel.getProjectionDimensions()
-        .toArray(new ProjectionDimension[queryModel.getProjectionDimensions().size()]));
-    blockExecutionInfo.setActualQueryMeasures(queryModel.getProjectionMeasures()
-        .toArray(new ProjectionMeasure[queryModel.getProjectionMeasures().size()]));
     DataTypeUtil.setDataTypeConverter(queryModel.getConverter());
     blockExecutionInfo.setRequiredRowId(queryModel.isRequiredRowId());
     return blockExecutionInfo;
@@ -691,28 +675,6 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     return 0;
   }
 
-  /**
-   * Below method will be used to get the measures present in the current block
-   *
-   * @param executionInfo
-   * @param queryModel         query model
-   * @param tableBlock         table block
-   * @return
-   */
-  private List<ProjectionMeasure> getCurrentBlockQueryMeasures(BlockExecutionInfo executionInfo,
-      QueryModel queryModel, AbstractIndex tableBlock) throws QueryExecutionException {
-    // getting the measure info which will be used while filling up measure data
-    List<ProjectionMeasure> updatedQueryMeasures = RestructureUtil
-        .createMeasureInfoAndGetCurrentBlockQueryMeasures(executionInfo,
-            queryModel.getProjectionMeasures(),
-            tableBlock.getSegmentProperties().getMeasures(),
-            queryModel.getTable().getTableInfo().isTransactionalTable());
-    // setting the measure aggregator for all aggregation function selected
-    // in query
-    executionInfo.getMeasureInfo().setMeasureDataTypes(queryProperties.measureDataTypes);
-    return updatedQueryMeasures;
-  }
-
   private int[] getComplexDimensionParentBlockIndexes(List<ProjectionDimension> queryDimensions) {
     List<Integer> parentBlockIndexList = new ArrayList<Integer>();
     for (ProjectionDimension queryDimension : queryDimensions) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java
index 4b59aa7..22939e1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -40,10 +39,6 @@ public class QueryExecutorProperties {
   public Map<String, Dictionary> columnToDictionaryMapping;
 
   /**
-   * Measure datatypes
-   */
-  public DataType[] measureDataTypes;
-  /**
    * all the complex dimension which is on filter
    */
   public Set<CarbonDimension> complexFilterDimension;
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index e823eb2..11b7372 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -38,6 +39,7 @@ import org.apache.carbondata.core.scan.executor.infos.DimensionInfo;
 import org.apache.carbondata.core.scan.executor.infos.MeasureInfo;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -63,16 +65,16 @@ public class RestructureUtil {
    * @return list of query dimension which is present in the table block
    */
   public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQueryDimension(
-      BlockExecutionInfo blockExecutionInfo, List<ProjectionDimension> queryDimensions,
+      BlockExecutionInfo blockExecutionInfo, ProjectionDimension[] queryDimensions,
       List<CarbonDimension> tableBlockDimensions, List<CarbonDimension> tableComplexDimension,
       int measureCount, boolean isTransactionalTable) {
     List<ProjectionDimension> presentDimension =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    boolean[] isDimensionExists = new boolean[queryDimensions.size()];
-    Object[] defaultValues = new Object[queryDimensions.size()];
+    boolean[] isDimensionExists = new boolean[queryDimensions.length];
+    Object[] defaultValues = new Object[queryDimensions.length];
     // create dimension information instance
     DimensionInfo dimensionInfo = new DimensionInfo(isDimensionExists, defaultValues);
-    dimensionInfo.dataType = new DataType[queryDimensions.size() + measureCount];
+    dimensionInfo.dataType = new DataType[queryDimensions.length + measureCount];
     int newDictionaryColumnCount = 0;
     int newNoDictionaryColumnCount = 0;
     // selecting only those dimension which is present in the query
@@ -412,14 +414,15 @@ public class RestructureUtil {
    * @return measures present in the block
    */
   public static List<ProjectionMeasure> createMeasureInfoAndGetCurrentBlockQueryMeasures(
-      BlockExecutionInfo blockExecutionInfo, List<ProjectionMeasure> queryMeasures,
+      BlockExecutionInfo blockExecutionInfo, ProjectionMeasure[] queryMeasures,
       List<CarbonMeasure> currentBlockMeasures, boolean isTransactionalTable) {
     MeasureInfo measureInfo = new MeasureInfo();
-    List<ProjectionMeasure> presentMeasure = new ArrayList<>(queryMeasures.size());
-    int numberOfMeasureInQuery = queryMeasures.size();
+    List<ProjectionMeasure> presentMeasure = new ArrayList<>(queryMeasures.length);
+    int numberOfMeasureInQuery = queryMeasures.length;
     List<Integer> measureOrdinalList = new ArrayList<>(numberOfMeasureInQuery);
     Object[] defaultValues = new Object[numberOfMeasureInQuery];
     boolean[] measureExistsInCurrentBlock = new boolean[numberOfMeasureInQuery];
+    DataType[] measureDataTypes = new DataType[numberOfMeasureInQuery];
     int index = 0;
     for (ProjectionMeasure queryMeasure : queryMeasures) {
       // if query measure exists in current dimension measures
@@ -437,12 +440,14 @@ public class RestructureUtil {
           presentMeasure.add(currentBlockMeasure);
           measureOrdinalList.add(carbonMeasure.getOrdinal());
           measureExistsInCurrentBlock[index] = true;
+          measureDataTypes[index] = carbonMeasure.getDataType();
           break;
         }
       }
       if (!measureExistsInCurrentBlock[index]) {
         defaultValues[index] = getMeasureDefaultValue(queryMeasure.getMeasure().getColumnSchema(),
             queryMeasure.getMeasure().getDefaultValue());
+        measureDataTypes[index] = queryMeasure.getMeasure().getDataType();
         blockExecutionInfo.setRestructuredBlock(true);
       }
       index++;
@@ -452,7 +457,63 @@ public class RestructureUtil {
     measureInfo.setDefaultValues(defaultValues);
     measureInfo.setMeasureOrdinals(measureOrdinals);
     measureInfo.setMeasureExists(measureExistsInCurrentBlock);
+    measureInfo.setMeasureDataTypes(measureDataTypes);
     blockExecutionInfo.setMeasureInfo(measureInfo);
     return presentMeasure;
   }
+
+  /**
+   * set actual projection of blockExecutionInfo
+   */
+  public static void actualProjectionOfSegment(BlockExecutionInfo blockExecutionInfo,
+      QueryModel queryModel, SegmentProperties segmentProperties) {
+    List<ProjectionDimension> projectionDimensions = queryModel.getProjectionDimensions();
+    List<ProjectionMeasure> projectionMeasures = queryModel.getProjectionMeasures();
+    if (queryModel.getTable().hasColumnDrift()) {
+      List<CarbonMeasure> tableBlockMeasures = segmentProperties.getMeasures();
+      List<ProjectionMeasure> updatedProjectionMeasures =
+          new ArrayList<>(projectionMeasures.size() + tableBlockMeasures.size());
+      updatedProjectionMeasures.addAll(projectionMeasures);
+      List<ProjectionDimension> updatedProjectionDimensions =
+          new ArrayList<>(projectionDimensions.size());
+      for (ProjectionDimension projectionDimension : projectionDimensions) {
+        CarbonMeasure carbonMeasure = null;
+        for (CarbonMeasure tableBlockMeasure : tableBlockMeasures) {
+          if (isColumnMatches(queryModel.getTable().isTransactionalTable(),
+              projectionDimension.getDimension(), tableBlockMeasure)) {
+            carbonMeasure = tableBlockMeasure;
+            break;
+          }
+        }
+        if (carbonMeasure != null) {
+          ProjectionMeasure projectionMeasure = new ProjectionMeasure(carbonMeasure);
+          projectionMeasure.setOrdinal(projectionDimension.getOrdinal());
+          updatedProjectionMeasures.add(projectionMeasure);
+        } else {
+          updatedProjectionDimensions.add(projectionDimension);
+        }
+      }
+      blockExecutionInfo.setActualQueryDimensions(updatedProjectionDimensions
+          .toArray(new ProjectionDimension[updatedProjectionDimensions.size()]));
+      blockExecutionInfo.setActualQueryMeasures(updatedProjectionMeasures
+          .toArray(new ProjectionMeasure[updatedProjectionMeasures.size()]));
+    } else {
+      blockExecutionInfo.setActualQueryDimensions(
+          projectionDimensions.toArray(new ProjectionDimension[projectionDimensions.size()]));
+      blockExecutionInfo.setActualQueryMeasures(
+          projectionMeasures.toArray(new ProjectionMeasure[projectionMeasures.size()]));
+    }
+  }
+
+  public static boolean hasColumnDriftOnSegment(CarbonTable table,
+      SegmentProperties segmentProperties) {
+    for (CarbonDimension queryColumn : table.getColumnDrift()) {
+      for (CarbonMeasure tableColumn : segmentProperties.getMeasures()) {
+        if (isColumnMatches(table.isTransactionalTable(), queryColumn, tableColumn)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index 4f934ce..d736805 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -312,7 +312,7 @@ public class QueryModelBuilder {
     queryModel.setReadPageByPage(readPageByPage);
     queryModel.setProjection(projection);
 
-    if (table.isTransactionalTable()) {
+    if (table.isTransactionalTable() && !table.hasColumnDrift()) {
       // set the filter to the query model in order to filter blocklet before scan
       boolean[] isFilterDimensions = new boolean[table.getDimensionOrdinalMax()];
       boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()];
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
index 7332614..80ec647 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
@@ -86,8 +86,8 @@ public class RestructureUtilTest {
     ProjectionMeasure queryMeasure2 = new ProjectionMeasure(new CarbonMeasure(columnSchema4, 4));
     List<ProjectionMeasure> queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2);
 
-    List<ProjectionDimension> queryDimensions =
-        Arrays.asList(queryDimension1, queryDimension2, queryDimension3);
+    ProjectionDimension[] queryDimensions =
+        new ProjectionDimension[] { queryDimension1, queryDimension2, queryDimension3 };
 
     List<ProjectionDimension> result = null;
     result = RestructureUtil
@@ -124,10 +124,11 @@ public class RestructureUtilTest {
     ProjectionMeasure queryMeasure1 = new ProjectionMeasure(carbonMeasure1);
     ProjectionMeasure queryMeasure2 = new ProjectionMeasure(carbonMeasure2);
     ProjectionMeasure queryMeasure3 = new ProjectionMeasure(carbonMeasure3);
-    List<ProjectionMeasure> queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2, queryMeasure3);
+    ProjectionMeasure[] queryMeasures =
+        new ProjectionMeasure[] { queryMeasure1, queryMeasure2, queryMeasure3 };
     BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
-    RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo, queryMeasures,
-        currentBlockMeasures, true);
+    RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo,
+        queryMeasures, currentBlockMeasures, true);
     MeasureInfo measureInfo = blockExecutionInfo.getMeasureInfo();
     boolean[] measuresExist = { true, true, false };
     assertThat(measureInfo.getMeasureExists(), is(equalTo(measuresExist)));
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index aba0ab7..90532fb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapJob;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.DataMapUtil;
@@ -54,7 +55,6 @@ import org.apache.carbondata.core.profiler.ExplainCollector;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.stats.QueryStatistic;
@@ -468,15 +468,8 @@ m filterExpression
   private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
       Expression expression, List<Segment> segmentIds) throws IOException {
     ExplainCollector.addPruningInfo(carbonTable.getTableName());
-    FilterResolverIntf resolver = null;
-    if (expression != null) {
-      carbonTable.processFilterExpression(expression, null, null);
-      resolver = CarbonTable.resolveFilter(expression, carbonTable.getAbsoluteTableIdentifier());
-      ExplainCollector.setFilterStatement(expression.getStatement());
-    } else {
-      ExplainCollector.setFilterStatement("none");
-    }
-
+    final DataMapFilter filter = new DataMapFilter(carbonTable, expression);
+    ExplainCollector.setFilterStatement(expression == null ? "none" : expression.getStatement());
     boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
             CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
@@ -487,11 +480,7 @@ m filterExpression
     List<ExtendedBlocklet> prunedBlocklets = null;
     // This is to log the event, so user will know what is happening by seeing logs.
     LOG.info("Started block pruning ...");
-    if (carbonTable.isTransactionalTable()) {
-      prunedBlocklets = defaultDataMap.prune(segmentIds, resolver, partitionsToPrune);
-    } else {
-      prunedBlocklets = defaultDataMap.prune(segmentIds, expression, partitionsToPrune);
-    }
+    prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune);
 
     if (ExplainCollector.enabled()) {
       ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets));
@@ -504,15 +493,15 @@ m filterExpression
     DataMapChooser chooser = new DataMapChooser(getOrCreateCarbonTable(job.getConfiguration()));
 
     // Get the available CG datamaps and prune further.
-    DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(resolver);
+    DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(filter.getResolver());
     if (cgDataMapExprWrapper != null) {
       // Prune segments from already pruned blocklets
       pruneSegments(segmentIds, prunedBlocklets);
       List<ExtendedBlocklet> cgPrunedBlocklets;
       // Again prune with CG datamap.
       if (distributedCG && dataMapJob != null) {
-        cgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable,
-            resolver, segmentIds, cgDataMapExprWrapper, dataMapJob, partitionsToPrune);
+        cgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable, filter.getResolver(),
+            segmentIds, cgDataMapExprWrapper, dataMapJob, partitionsToPrune);
       } else {
         cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
       }
@@ -529,12 +518,12 @@ m filterExpression
     }
     // Now try to prune with FG DataMap.
     if (isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) {
-      DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(resolver);
+      DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(filter.getResolver());
       if (fgDataMapExprWrapper != null) {
         // Prune segments from already pruned blocklets
         pruneSegments(segmentIds, prunedBlocklets);
         List<ExtendedBlocklet> fgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable,
-            resolver, segmentIds, fgDataMapExprWrapper, dataMapJob, partitionsToPrune);
+            filter.getResolver(), segmentIds, fgDataMapExprWrapper, dataMapJob, partitionsToPrune);
         // note that the 'fgPrunedBlocklets' has extra datamap related info compared with
         // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
         prunedBlocklets = intersectFilteredBlocklets(carbonTable, prunedBlocklets,
diff --git a/integration/spark-common-test/src/test/resources/sort_columns/alldatatype1.csv b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype1.csv
new file mode 100644
index 0000000..1176363
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype1.csv
@@ -0,0 +1,13 @@
+smallIntField,intField,bigIntField,floatField,doubleField,decimalField,timestampField,dateField,stringField,varcharField,charField,arrayField,structField
+1,1,2,1.1,12.12,1.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+1,1,2,2.1,11.12,2.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+2,1,2,3.1,10.12,3.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde2,a$b$c$2,a$b$2
+2,1,2,4.1,9.12,4.123,2017-01-11 00:00:01,2017-01-11,abc2,abcd2,abcde2,a$b$c$2,a$b$2
+2,2,3,5.2,8.12,5.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$3
+2,2,3,6.2,7.12,6.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$3
+4,2,3,7.2,6.12,7.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde1,a$b$c$4,a$b$1
+4,2,3,8.1,5.12,8.123,2017-02-12 00:00:02,2017-02-12,abc4,abcd1,abcde1,a$b$c$4,a$b$1
+4,4,1,9.1,4.12,9.123,2017-03-13 00:00:04,2017-03-14,abc4,abcd4,abcde4,a$b$c$4,a$b$2
+4,4,1,10.1,3.12,10.123,2017-03-13 00:00:04,2017-03-14,abc4,abcd4,abcde4,a$b$c$4,a$b$2
+1,4,1,11.1,2.12,11.123,2017-03-13 00:00:04,2017-03-14,abc4,abcd4,abcde4,a$b$c$1,a$b$3
+1,4,1,12.1,1.12,12.123,2017-03-13 00:00:04,2017-03-14,abc1,abcd4,abcde4,a$b$c$1,a$b$3
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/sort_columns/alldatatype2.csv b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype2.csv
new file mode 100644
index 0000000..649bbdc
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype2.csv
@@ -0,0 +1,13 @@
+smallIntField,intField,bigIntField,floatField,doubleField,decimalField,timestampField,dateField,stringField,varcharField,charField,arrayField,structField
+1,1,1,13.2,6.12,7.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde1,a$b$c$1,a$b$1
+1,1,1,14.1,5.12,8.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde1,a$b$c$1,a$b$1
+1,1,2,15.1,4.12,9.123,2017-03-11 00:00:03,2017-03-11,abc2,abcd1,abcde1,a$b$c$2,a$b$2
+1,2,2,16.1,3.12,10.123,2017-03-11 00:00:03,2017-03-11,abc1,abcd1,abcde2,a$b$c$2,a$b$2
+1,2,2,17.1,2.12,11.123,2017-03-12 00:00:03,2017-03-12,abc1,abcd2,abcde2,a$b$c$1,a$b$1
+1,2,1,18.1,1.12,12.123,2017-03-12 00:00:03,2017-03-12,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+2,2,1,19.1,12.12,1.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+2,2,1,20.1,11.12,2.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+2,2,2,21.1,10.12,3.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde2,a$b$c$2,a$b$2
+2,1,2,22.1,9.12,4.123,2017-01-11 00:00:01,2017-01-11,abc2,abcd2,abcde2,a$b$c$2,a$b$2
+2,1,2,23.2,8.12,5.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$2
+2,1,1,24.2,7.12,6.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$2
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
new file mode 100644
index 0000000..bf4bae6
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.alterTable
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+      "yyyy-MM-dd")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      "yyyy-MM-dd HH:mm:ss")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+    dropTable()
+    prepareTable()
+  }
+
+  override def afterAll(): Unit = {
+    dropTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+        CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
+  }
+
+  private def prepareTable(): Unit = {
+    createTable(
+      "alter_sc_base",
+      Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+    )
+    createTable(
+      "alter_sc_base_complex",
+      Map("sort_scope"->"local_sort", "sort_columns"->"stringField"),
+      true
+    )
+    createTable(
+      "alter_sc_validate",
+      Map("dictionary_include"->"charField"),
+      true
+    )
+    createTable(
+      "alter_sc_iud",
+      Map("dictionary_include"->"charField")
+    )
+    createTable(
+      "alter_sc_iud_complex",
+      Map("dictionary_include"->"charField"),
+      true
+    )
+    createTable(
+      "alter_sc_long_string",
+      Map("LONG_STRING_COLUMNS"->"stringField"),
+      true
+    )
+    createTable(
+      "alter_sc_insert",
+      Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+    )
+    loadData("alter_sc_insert")
+    createTable(
+      "alter_sc_insert_complex",
+      Map("sort_scope"->"local_sort", "sort_columns"->"stringField"),
+      true
+    )
+    loadData("alter_sc_insert_complex")
+    createTable(
+      "alter_sc_range_column",
+      Map("sort_scope"->"local_sort", "sort_columns"->"stringField", "range_column"->"smallIntField")
+    )
+    createTable(
+      "alter_sc_range_column_base",
+      Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+    )
+
+    Array("alter_sc_add_column", "alter_sc_add_column_base").foreach { tableName =>
+      sql(
+        s"""create table $tableName(
+           | smallIntField smallInt,
+           | intField int,
+           | bigIntField bigint,
+           | floatField float,
+           | doubleField double,
+           | timestampField timestamp,
+           | dateField date,
+           | stringField string
+           | )
+           | stored as carbondata
+      """.stripMargin)
+    }
+    // decimalField decimal(25, 4),
+
+    createTable(
+      "alter_sc_bloom",
+      Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+    )
+    createBloomDataMap("alter_sc_bloom", "alter_sc_bloom_dm1")
+    createTable(
+      "alter_sc_bloom_base",
+      Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+    )
+    createBloomDataMap("alter_sc_bloom_base", "alter_sc_bloom_base_dm1")
+    createTable(
+      "alter_sc_agg",
+      Map("sort_scope"->"local_sort", "sort_columns"->"intField")
+    )
+    createAggDataMap("alter_sc_agg", "alter_sc_agg_dm1")
+    createTable(
+      "alter_sc_agg_base",
+      Map("sort_scope"->"local_sort", "sort_columns"->"intField")
+    )
+    createAggDataMap("alter_sc_agg_base", "alter_sc_agg_base_dm1")
+  }
+
+  private def dropTable(): Unit = {
+    sql(s"drop table if exists alter_sc_base")
+    sql(s"drop table if exists alter_sc_base_complex")
+    sql(s"drop table if exists alter_sc_validate")
+    sql(s"drop table if exists alter_sc_iud")
+    sql(s"drop table if exists alter_sc_iud_complex")
+    sql(s"drop table if exists alter_sc_long_string")
+    sql(s"drop table if exists alter_sc_insert")
+    sql(s"drop table if exists alter_sc_insert_complex")
+    sql(s"drop table if exists alter_sc_range_column")
+    sql(s"drop table if exists alter_sc_range_column_base")
+    sql(s"drop table if exists alter_sc_add_column")
+    sql(s"drop table if exists alter_sc_add_column_base")
+    sql(s"drop table if exists alter_sc_bloom")
+    sql(s"drop table if exists alter_sc_bloom_base")
+    sql(s"drop table if exists alter_sc_agg")
+    sql(s"drop table if exists alter_sc_agg_base")
+  }
+
+  private def createTable(
+      tableName: String,
+      tblProperties: Map[String, String] = Map.empty,
+      withComplex: Boolean = false
+  ): Unit = {
+    val complexSql =
+      if (withComplex) {
+        ", arrayField array<string>, structField struct<col1:string, col2:string, col3:string>"
+      } else {
+        ""
+      }
+    val tblPropertiesSql =
+      if (tblProperties.isEmpty) {
+        ""
+      } else {
+        val propertiesString =
+          tblProperties
+            .map { entry =>
+              s"'${ entry._1 }'='${ entry._2 }'"
+            }
+            .mkString(",")
+        s"tblproperties($propertiesString)"
+      }
+
+    sql(
+      s"""create table $tableName(
+         | smallIntField smallInt,
+         | intField int,
+         | bigIntField bigint,
+         | floatField float,
+         | doubleField double,
+         | timestampField timestamp,
+         | dateField date,
+         | stringField string,
+         | varcharField varchar(10),
+         | charField char(10)
+         | $complexSql
+         | )
+         | stored as carbondata
+         | $tblPropertiesSql
+      """.stripMargin)
+    // decimalField decimal(25, 4),
+  }
+
+  private def createBloomDataMap(tableName: String, dataMapName: String): Unit = {
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $tableName
+         | USING 'bloomfilter'
+         | DMPROPERTIES(
+         | 'INDEX_COLUMNS'='smallIntField,floatField,timestampField,dateField,stringField',
+         | 'BLOOM_SIZE'='6400',
+         | 'BLOOM_FPP'='0.001',
+         | 'BLOOM_COMPRESS'='TRUE')
+       """.stripMargin)
+  }
+
+  private def createAggDataMap(tableName: String, dataMapName: String): Unit = {
+    sql(s"create datamap PreAggSum$dataMapName on table $tableName using 'preaggregate' as " +
+        s"select stringField,sum(intField) as sum from $tableName group by stringField")
+    sql(s"create datamap PreAggAvg$dataMapName on table $tableName using 'preaggregate' as " +
+        s"select stringField,avg(intField) as avg from $tableName group by stringField")
+    sql(s"create datamap PreAggCount$dataMapName on table $tableName using 'preaggregate' as " +
+        s"select stringField,count(intField) as count from $tableName group by stringField")
+    sql(s"create datamap PreAggMin$dataMapName on table $tableName using 'preaggregate' as " +
+        s"select stringField,min(intField) as min from $tableName group by stringField")
+    sql(s"create datamap PreAggMax$dataMapName on table $tableName using 'preaggregate' as " +
+        s"select stringField,max(intField) as max from $tableName group by stringField")
+  }
+
+  private def loadData(tableNames: String*): Unit = {
+    tableNames.foreach { tableName =>
+      sql(
+        s"""load data local inpath '$resourcesPath/sort_columns'
+           | into table $tableName
+           | options ('global_sort_partitions'='2', 'COMPLEX_DELIMITER_LEVEL_1'='$$', 'COMPLEX_DELIMITER_LEVEL_2'=':')
+      """.stripMargin)
+    }
+  }
+
+  private def insertData(insertTable: String, tableNames: String*): Unit = {
+    tableNames.foreach { tableName =>
+      sql(
+        s"""insert into table $tableName select * from $insertTable
+      """.stripMargin)
+    }
+  }
+
+  test("validate sort_scope and sort_columns") {
+    // invalid combination
+    var ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort')")
+    }
+    assert(ex.getMessage.contains("Cannot set SORT_SCOPE as local_sort when table has no SORT_COLUMNS"))
+
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort')")
+    }
+    assert(ex.getMessage.contains("Cannot set SORT_SCOPE as global_sort when table has no SORT_COLUMNS"))
+
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort', 'sort_columns'='')")
+    }
+    assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when setting SORT_SCOPE as local_sort"))
+
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort', 'sort_columns'=' ')")
+    }
+    assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when setting SORT_SCOPE as global_sort"))
+
+    sql("alter table alter_sc_validate set tblproperties('sort_columns'='stringField', 'sort_scope'='local_sort')")
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_columns'=' ')")
+    }
+    assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when SORT_SCOPE is LOCAL_SORT"))
+
+    sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort')")
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_columns'='')")
+    }
+    assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when SORT_SCOPE is GLOBAL_SORT"))
+
+    // wrong/duplicate sort_columns
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_columns'=' stringField1 , intField')")
+    }
+    assert(ex.getMessage.contains("stringField1 does not exist in table"))
+
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_columns'=' stringField1 , intField, stringField1')")
+    }
+    assert(ex.getMessage.contains("SORT_COLUMNS Either having duplicate columns : stringField1 or it contains illegal argumnet"))
+
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_columns'=' stringField , intField, stringField')")
+    }
+    assert(ex.getMessage.contains("SORT_COLUMNS Either having duplicate columns : stringField or it contains illegal argumnet"))
+
+    // not supported data type
+//    ex = intercept[RuntimeException] {
+//      sql("alter table alter_sc_validate set tblproperties('sort_columns'='decimalField')")
+//    }
+//    assert(ex.getMessage.contains("sort_columns is unsupported for DECIMAL data type column: decimalField"))
+
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_columns'='doubleField')")
+    }
+    assert(ex.getMessage.contains("sort_columns is unsupported for DOUBLE datatype column: doubleField"))
+
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_columns'='arrayField')")
+    }
+    assert(ex.getMessage.contains("sort_columns is unsupported for ARRAY datatype column: arrayField"))
+
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_columns'='structField')")
+    }
+    assert(ex.getMessage.contains("sort_columns is unsupported for STRUCT datatype column: structField"))
+
+    ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_validate set tblproperties('sort_columns'='structField.col1')")
+    }
+    assert(ex.getMessage.contains("sort_columns: structField.col1 does not exist in table"))
+  }
+
+  test("long string column") {
+    val ex = intercept[RuntimeException] {
+      sql("alter table alter_sc_long_string set tblproperties('sort_columns'='intField, stringField')")
+    }
+    assert(ex.getMessage.contains("sort_columns is unsupported for long string datatype column: stringField"))
+  }
+
+  test("describe formatted") {
+    // valid combination
+    sql("alter table alter_sc_validate set tblproperties('sort_scope'='no_sort', 'sort_columns'='')")
+    checkExistence(sql("describe formatted alter_sc_validate"), true, "NO_SORT")
+
+    sql("alter table alter_sc_validate set tblproperties('sort_scope'='no_sort', 'sort_columns'='bigIntField,stringField')")
+    checkExistence(sql("describe formatted alter_sc_validate"), true, "no_sort", "bigIntField, stringField".toLowerCase())
+
+    sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort', 'sort_columns'='stringField,bigIntField')")
+    checkExistence(sql("describe formatted alter_sc_validate"), true, "local_sort", "stringField, bigIntField".toLowerCase())
+
+    // global dictionary or direct dictionary
+    sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort', 'sort_columns'=' charField , bigIntField , timestampField ')")
+    checkExistence(sql("describe formatted alter_sc_validate"), true, "global_sort", "charField, bigIntField, timestampField".toLowerCase())
+
+    // supported data type
+    sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, intField, bigIntField, timestampField, dateField, stringField, varcharField, charField')")
+    checkExistence(sql("describe formatted alter_sc_validate"), true, "local_sort", "smallIntField, intField, bigIntField, timestampField, dateField, stringField, varcharField, charField".toLowerCase())
+  }
+
+  test("IUD and Query") {
+    testIUDAndQuery("alter_sc_iud", "alter_sc_base", "alter_sc_insert")
+  }
+
+  test("IUD and Query with complex data type") {
+    testIUDAndQuery("alter_sc_iud_complex", "alter_sc_base_complex", "alter_sc_insert_complex")
+  }
+
+  private def testIUDAndQuery(tableName: String, baseTableName: String, insertTableName: String): Unit = {
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    // alter table to local_sort with new SORT_COLUMNS
+    sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='timestampField, intField, stringField')")
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    // alter table to revert SORT_COLUMNS
+    sql(s"alter table $tableName set tblproperties('sort_columns'='stringField, intField, timestampField')")
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    // alter table to change SORT_COLUMNS
+    sql(s"alter table $tableName set tblproperties('sort_columns'='smallIntField, stringField, intField')")
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    // alter table to change SORT_SCOPE and SORT_COLUMNS
+    sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='charField, bigIntField, smallIntField')")
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    // alter table to change SORT_SCOPE
+    sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField, bigIntField, smallIntField')")
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    // query
+    checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField"))
+
+    // set input segments
+    (0 to 5).foreach { segment =>
+      sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false)
+      sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false)
+      checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+    }
+    sql(s"set carbon.input.segments.default.$tableName=*").show(100, false)
+    sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false)
+
+    // delete
+    sql(s"delete from $tableName where smallIntField = 2")
+    sql(s"delete from $baseTableName where smallIntField = 2")
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    sql(s"delete from $tableName")
+    checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(0)))
+    sql(s"delete from $baseTableName")
+    checkAnswer(sql(s"select count(*) from $baseTableName"), Seq(Row(0)))
+
+    // insert & load data
+    sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='timestampField')")
+    insertData(insertTableName, tableName, baseTableName)
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    sql(s"alter table $tableName set tblproperties('sort_scope'='no_sort', 'sort_columns'='')")
+    insertData(insertTableName, tableName, baseTableName)
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField, bigIntField, smallIntField')")
+    insertData(insertTableName, tableName, baseTableName)
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    // update
+    sql(s"update $tableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").show()
+    sql(s"update $baseTableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").show()
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+    // query
+    checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField"))
+
+    // set input segments
+    (6 to 11).foreach { segment =>
+      sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false)
+      sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false)
+      checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+    }
+    sql(s"set carbon.input.segments.default.$tableName=*").show(100, false)
+    sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false)
+
+    // compaction
+    sql(s"show segments for table $tableName").show(100, false)
+    sql(s"show segments for table $baseTableName").show(100, false)
+    sql(s"alter table $tableName compact 'minor'")
+    sql(s"alter table $baseTableName compact 'minor'")
+    sql(s"show segments for table $tableName").show(100, false)
+    sql(s"show segments for table $baseTableName").show(100, false)
+    checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField"))
+  }
+
+  test("range column") {
+    val tableName = "alter_sc_range_column"
+    val baseTableName = "alter_sc_range_column_base"
+    loadData(tableName, baseTableName)
+    sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, charField')")
+    loadData(tableName, baseTableName)
+
+    checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+  }
+
+  test("add/drop column for sort_columns") {
+    val tableName = "alter_sc_add_column"
+    val baseTableName = "alter_sc_add_column_base"
+    loadData(tableName, baseTableName)
+    sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, stringField')")
+    loadData(tableName, baseTableName)
+    // add column
+    sql(s"alter table $tableName add columns( varcharField varchar(10), charField char(10))")
+    sql(s"alter table $baseTableName add columns( varcharField varchar(10), charField char(10))")
+    loadData(tableName, baseTableName)
+
+    checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName order by floatField, charField"), sql(s"select * from $baseTableName order by floatField, charField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField, charField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField, charField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is null order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is not null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is not null order by floatField"))
+
+    // add new column to sort_columns
+    sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, charField')")
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName order by floatField, charField"), sql(s"select * from $baseTableName order by floatField, charField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField, charField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField, charField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is null order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is not null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is not null order by floatField"))
+
+    // drop column of old sort_columns
+    sql(s"alter table $tableName drop columns(stringField)")
+    sql(s"alter table $baseTableName drop columns(stringField)")
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName order by floatField, charField"), sql(s"select * from $baseTableName order by floatField, charField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField, charField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField, charField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is null order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is not null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is not null order by floatField"))
+  }
+
+  test("bloom filter") {
+    val tableName = "alter_sc_bloom"
+    val dataMapName = "alter_sc_bloom_dm1"
+    val baseTableName = "alter_sc_bloom_base"
+    loadData(tableName, baseTableName)
+    checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), true, "bloomfilter", dataMapName)
+    checkExistence(sql(s"EXPLAIN SELECT * FROM $tableName WHERE smallIntField = 3"), true, "bloomfilter", dataMapName)
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 3 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 3 order by floatField"))
+
+    sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='smallIntField, charField')")
+    loadData(tableName, baseTableName)
+    checkExistence(sql(s"EXPLAIN SELECT * FROM $tableName WHERE smallIntField = 3"), true, "bloomfilter", dataMapName)
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 3 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 3 order by floatField"))
+  }
+
+  test("pre-aggregate") {
+    val tableName = "alter_sc_agg"
+    val dataMapName = "alter_sc_agg_dm1"
+    val baseTableName = "alter_sc_agg_base"
+    loadData(tableName, baseTableName)
+    sql(s"SHOW DATAMAP ON TABLE $tableName").show(100, false)
+    checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), true, "preaggregate", dataMapName)
+    checkExistence(sql(s"EXPLAIN select stringField,sum(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName)
+    checkAnswer(sql(s"select stringField,sum(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), sql(s"select stringField,sum(intField) as sum from $baseTableName where stringField = 'abc2' group by stringField"))
+
+    sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='smallIntField, charField')")
+    loadData(tableName, baseTableName)
+    sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField").show(100, false)
+    checkExistence(sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName)
+    checkAnswer(sql(s"select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), sql(s"select stringField,max(intField) as sum from $baseTableName where stringField = 'abc2' group by stringField"))
+  }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6cee8dc..d0ed815 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -232,7 +232,11 @@ class CarbonScanRDD[T: ClassTag](
       statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
       statisticRecorder.recordStatisticsForDriver(statistic, queryId)
       statistic = new QueryStatistic()
-      val carbonDistribution = if (directFill) {
+      // When the table has column drift, it means different blocks maybe have different schemas.
+      // the query doesn't support to scan the blocks with different schemas in a task.
+      // So if the table has the column drift, CARBON_TASK_DISTRIBUTION_MERGE_FILES and
+      // CARBON_TASK_DISTRIBUTION_CUSTOM can't work.
+      val carbonDistribution = if (directFill && !tableInfo.hasColumnDrift) {
         CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES
       } else {
         CarbonProperties.getInstance().getProperty(
@@ -260,7 +264,7 @@ class CarbonScanRDD[T: ClassTag](
             CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
             "false").toBoolean ||
           carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM)
-        if (useCustomDistribution) {
+        if (useCustomDistribution && !tableInfo.hasColumnDrift) {
           // create a list of block based on split
           val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
 
@@ -297,7 +301,7 @@ class CarbonScanRDD[T: ClassTag](
             val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
             result.add(partition)
           }
-        } else if (carbonDistribution.equalsIgnoreCase(
+        } else if (!tableInfo.hasColumnDrift && carbonDistribution.equalsIgnoreCase(
             CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)) {
 
           // sort blocks in reverse order of length
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index d90c6b2..da42363 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.spark.util
 
-
 import java.io.File
 import java.math.BigDecimal
 import java.text.SimpleDateFormat
@@ -47,16 +46,16 @@ import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, ThreadLocalTaskInfo}
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, ThreadLocalTaskInfo}
 import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 
-
 object CommonUtil {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -794,6 +793,7 @@ object CommonUtil {
     }
     storeLocation
   }
+
   /**
    * This method will validate the cache level
    *
@@ -909,6 +909,80 @@ object CommonUtil {
     }
   }
 
+  def isDataTypeSupportedForSortColumn(columnDataType: String): Boolean = {
+    val dataTypes = Array("array", "struct", "map", "double", "float", "decimal", "binary")
+    dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
+  }
+
+  def validateSortScope(newProperties: Map[String, String]): Unit = {
+    val sortScopeOption = newProperties.get(CarbonCommonConstants.SORT_SCOPE)
+    if (sortScopeOption.isDefined) {
+      if (!CarbonUtil.isValidSortOption(sortScopeOption.get)) {
+        throw new MalformedCarbonCommandException(
+          s"Invalid SORT_SCOPE ${ sortScopeOption.get }, " +
+          s"valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT'")
+      }
+    }
+  }
+
+  def validateSortColumns(
+      sortKey: Array[String],
+      fields: Seq[(String, String)],
+      varcharCols: Seq[String]
+  ): Unit = {
+    if (sortKey.diff(sortKey.distinct).length > 0 ||
+        (sortKey.length > 1 && sortKey.contains(""))) {
+      throw new MalformedCarbonCommandException(
+        "SORT_COLUMNS Either having duplicate columns : " +
+        sortKey.diff(sortKey.distinct).mkString(",") + " or it contains illegal argumnet.")
+    }
+
+    sortKey.foreach { column =>
+      if (!fields.exists(x => x._1.equalsIgnoreCase(column))) {
+        val errorMsg = "sort_columns: " + column +
+                       " does not exist in table. Please check the create table statement."
+        throw new MalformedCarbonCommandException(errorMsg)
+      } else {
+        val dataType = fields.find(x =>
+          x._1.equalsIgnoreCase(column)).get._2
+        if (isDataTypeSupportedForSortColumn(dataType)) {
+          val errorMsg = s"sort_columns is unsupported for $dataType datatype column: " + column
+          throw new MalformedCarbonCommandException(errorMsg)
+        }
+        if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
+          throw new MalformedCarbonCommandException(
+            s"sort_columns is unsupported for long string datatype column: $column")
+        }
+      }
+    }
+  }
+
+  def validateSortColumns(carbonTable: CarbonTable, newProperties: Map[String, String]): Unit = {
+    val fields = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
+    val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties
+    var sortKeyOption = newProperties.get(CarbonCommonConstants.SORT_COLUMNS)
+    val varcharColsString = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
+    val varcharCols: Seq[String] = if (varcharColsString == null) {
+      Seq.empty[String]
+    } else {
+      varcharColsString.split(",").map(_.trim)
+    }
+
+    if (!sortKeyOption.isDefined) {
+      // default no columns are selected for sorting in no_sort scope
+      sortKeyOption = Some("")
+    }
+    val sortKeyString = CarbonUtil.unquoteChar(sortKeyOption.get).trim
+    if (!sortKeyString.isEmpty) {
+      val sortKey = sortKeyString.split(',').map(_.trim)
+      validateSortColumns(
+        sortKey,
+        fields.map { field => (field.getColName, field.getDataType.getName) },
+        varcharCols
+      )
+    }
+  }
+
   def bytesToDisplaySize(size: Long): String = bytesToDisplaySize(BigDecimal.valueOf(size))
 
   // This method converts the bytes count to display size upto 2 decimal places
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3e80ea6..d978128 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -760,32 +760,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     var sortKeyDimsTmp: Seq[String] = Seq[String]()
     if (!sortKeyString.isEmpty) {
       val sortKey = sortKeyString.split(',').map(_.trim)
-      if (sortKey.diff(sortKey.distinct).length > 0 ||
-          (sortKey.length > 1 && sortKey.contains(""))) {
-        throw new MalformedCarbonCommandException(
-          "SORT_COLUMNS Either having duplicate columns : " +
-          sortKey.diff(sortKey.distinct).mkString(",") + " or it contains illegal argumnet.")
-      }
-
-      sortKey.foreach { column =>
-        if (!fields.exists(x => x.column.equalsIgnoreCase(column))) {
-          val errorMsg = "sort_columns: " + column +
-            " does not exist in table. Please check the create table statement."
-          throw new MalformedCarbonCommandException(errorMsg)
-        } else {
-          val dataType = fields.find(x =>
-            x.column.equalsIgnoreCase(column)).get.dataType.get
-          if (isDataTypeSupportedForSortColumn(dataType)) {
-            val errorMsg = s"sort_columns is unsupported for $dataType datatype column: " + column
-            throw new MalformedCarbonCommandException(errorMsg)
-          }
-          if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
-            throw new MalformedCarbonCommandException(
-              s"sort_columns is unsupported for long string datatype column: $column")
-          }
-        }
-      }
-
+      CommonUtil.validateSortColumns(
+        sortKey,
+        fields.map { field => (field.column, field.dataType.get) },
+        varcharCols
+      )
       sortKey.foreach { dimension =>
         if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase)) {
           fields.foreach { field =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 1dc562dc..99bc863 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -32,13 +33,14 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.converter.{SchemaConverter, ThriftWrapperSchemaConverterImpl}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
@@ -101,6 +103,76 @@ object AlterTableUtil {
   }
 
   /**
+   * update schema when SORT_COLUMNS are be changed
+   */
+  private def updateSchemaForSortColumns(
+      thriftTable: TableInfo,
+      lowerCasePropertiesMap: mutable.Map[String, String],
+      schemaConverter: SchemaConverter
+  ) = {
+    val sortColumnsOption = lowerCasePropertiesMap.get(CarbonCommonConstants.SORT_COLUMNS)
+    if (sortColumnsOption.isDefined) {
+      val sortColumnsString = CarbonUtil.unquoteChar(sortColumnsOption.get).trim
+      val columns = thriftTable.getFact_table.getTable_columns
+      // remove old sort_columns property from ColumnSchema
+      val columnSeq =
+        columns
+          .asScala
+          .map { column =>
+            val columnProperties = column.getColumnProperties
+            if (columnProperties != null) {
+              columnProperties.remove(CarbonCommonConstants.SORT_COLUMNS)
+            }
+            column
+          }
+          .zipWithIndex
+      if (!sortColumnsString.isEmpty) {
+        val newSortColumns = sortColumnsString.split(',').map(_.trim)
+        // map sort_columns index in column list
+        val sortColumnsIndexMap = newSortColumns
+          .zipWithIndex
+          .map { entry =>
+            val column = columnSeq.find(_._1.getColumn_name.equalsIgnoreCase(entry._1)).get
+            var columnProperties = column._1.getColumnProperties
+            if (columnProperties == null) {
+              columnProperties = new util.HashMap[String, String]()
+              column._1.setColumnProperties(columnProperties)
+            }
+            // change sort_columns to dimension
+            if (!column._1.isDimension) {
+              column._1.setDimension(true)
+              columnProperties.put(CarbonCommonConstants.COLUMN_DRIFT, "true")
+            }
+            // add sort_columns property
+            columnProperties.put(CarbonCommonConstants.SORT_COLUMNS, "true")
+            (column._2, entry._2)
+          }
+          .toMap
+        var index = newSortColumns.length
+        // re-order all columns, move sort_columns to the head of column list
+        val newColumns = columnSeq
+          .map { entry =>
+            val sortColumnIndexOption = sortColumnsIndexMap.get(entry._2)
+            val newIndex = if (sortColumnIndexOption.isDefined) {
+              sortColumnIndexOption.get
+            } else {
+              val tempIndex = index
+              index += 1
+              tempIndex
+            }
+            (newIndex, entry._1)
+          }
+          .sortWith(_._1 < _._1)
+          .map(_._2)
+          .asJava
+        // use new columns
+        columns.clear()
+        columns.addAll(newColumns)
+      }
+    }
+  }
+
+  /**
    * @param carbonTable
    * @param schemaEvolutionEntry
    * @param thriftTable
@@ -361,9 +433,10 @@ object AlterTableUtil {
       // validate the range column properties
       validateRangeColumnProperties(carbonTable, lowerCasePropertiesMap)
 
-      // validate the Sort Scope
-      validateSortScopeProperty(carbonTable, lowerCasePropertiesMap)
-
+      // validate the Sort Scope and Sort Columns
+      validateSortScopeAndSortColumnsProperties(carbonTable, lowerCasePropertiesMap)
+      // if SORT_COLUMN is changed, it will move them to the head of column list
+      updateSchemaForSortColumns(thriftTable, lowerCasePropertiesMap, schemaConverter)
       // below map will be used for cache invalidation. As tblProperties map is getting modified
       // in the next few steps the original map need to be retained for any decision making
       val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*)
@@ -394,9 +467,13 @@ object AlterTableUtil {
             if (propKey.equalsIgnoreCase(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)) {
               tblPropertiesMap
                 .put(propKey.toLowerCase, CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
-            } else if (propKey.equalsIgnoreCase("sort_scope")) {
+            } else if (propKey.equalsIgnoreCase(CarbonCommonConstants.SORT_SCOPE)) {
               tblPropertiesMap
                 .put(propKey.toLowerCase, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+            } else if (propKey.equalsIgnoreCase(CarbonCommonConstants.SORT_COLUMNS)) {
+              val errorMessage = "Error: Invalid option(s): " + propKey +
+                                 ", please set SORT_COLUMNS as empty instead of unset"
+              throw new MalformedCarbonCommandException(errorMessage)
             } else {
               tblPropertiesMap.remove(propKey.toLowerCase)
             }
@@ -440,7 +517,8 @@ object AlterTableUtil {
       "LOCAL_DICTIONARY_EXCLUDE",
       "LOAD_MIN_SIZE_INMB",
       "RANGE_COLUMN",
-      "SORT_SCOPE")
+      "SORT_SCOPE",
+      "SORT_COLUMNS")
     supportedOptions.contains(propKey.toUpperCase)
   }
 
@@ -542,18 +620,34 @@ object AlterTableUtil {
     }
   }
 
-  def validateSortScopeProperty(carbonTable: CarbonTable,
+  def validateSortScopeAndSortColumnsProperties(carbonTable: CarbonTable,
       propertiesMap: mutable.Map[String, String]): Unit = {
-    propertiesMap.foreach { property =>
-      if (property._1.equalsIgnoreCase("SORT_SCOPE")) {
-        if (!CarbonUtil.isValidSortOption(property._2)) {
-          throw new MalformedCarbonCommandException(
-            s"Invalid SORT_SCOPE ${ property._2 }, valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', " +
-            s"'LOCAL_SORT' and 'GLOBAL_SORT'")
-        } else if (!property._2.equalsIgnoreCase("NO_SORT") &&
-                   (carbonTable.getNumberOfSortColumns == 0)) {
+    CommonUtil.validateSortScope(propertiesMap)
+    CommonUtil.validateSortColumns(carbonTable, propertiesMap)
+    // match SORT_SCOPE and SORT_COLUMNS
+    val newSortScope = propertiesMap.get(CarbonCommonConstants.SORT_SCOPE)
+    val newSortColumns = propertiesMap.get(CarbonCommonConstants.SORT_COLUMNS)
+    if (newSortScope.isDefined) {
+      // 1. check SORT_COLUMNS when SORT_SCOPE is not changed to NO_SORT
+      if (!SortScope.NO_SORT.name().equalsIgnoreCase(newSortScope.get)) {
+        if (newSortColumns.isDefined) {
+          if (StringUtils.isBlank(CarbonUtil.unquoteChar(newSortColumns.get))) {
+            throw new InvalidConfigurationException(
+              s"Cannot set SORT_COLUMNS as empty when setting SORT_SCOPE as ${newSortScope.get} ")
+          }
+        } else {
+          if (carbonTable.getNumberOfSortColumns == 0) {
+            throw new InvalidConfigurationException(
+              s"Cannot set SORT_SCOPE as ${newSortScope.get} when table has no SORT_COLUMNS")
+          }
+        }
+      }
+    } else if (newSortColumns.isDefined) {
+      // 2. check SORT_SCOPE when SORT_COLUMNS is changed to empty
+      if (StringUtils.isBlank(CarbonUtil.unquoteChar(newSortColumns.get))) {
+        if (!SortScope.NO_SORT.equals(carbonTable.getSortScope)) {
           throw new InvalidConfigurationException(
-            s"Cannot set SORT_SCOPE as ${ property._2 } when table has no SORT_COLUMNS")
+            s"Cannot set SORT_COLUMNS as empty when SORT_SCOPE is ${carbonTable.getSortScope} ")
         }
       }
     }