You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/03/31 15:50:44 UTC

[1/3] incubator-carbondata git commit: sort columns

Repository: incubator-carbondata
Updated Branches:
  refs/heads/12-dev 3b62d25cf -> 5763f8c60


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 3346743..30e03ba 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -52,6 +52,7 @@ case class TableModel(
     tableProperties: Map[String, String],
     dimCols: Seq[Field],
     msrCols: Seq[Field],
+    sortKeyDims: Option[Seq[String]],
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
     columnGroups: Seq[String],
@@ -357,6 +358,7 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setPrecision(precision)
     columnSchema.setScale(scale)
     columnSchema.setSchemaOrdinal(schemaOrdinal)
+    columnSchema.setSortColumn(false)
     // TODO: Need to fill RowGroupID, converted type
     // & Number of Children after DDL finalization
     columnSchema
@@ -367,7 +369,11 @@ class TableNewProcessor(cm: TableModel) {
     val LOGGER = LogServiceFactory.getLogService(TableNewProcessor.getClass.getName)
     var allColumns = Seq[ColumnSchema]()
     var index = 0
-    cm.dimCols.foreach(field => {
+    var measureCount = 0
+
+    // Sort columns should be at the begin of all columns
+    cm.sortKeyDims.get.foreach { keyDim =>
+      val field = cm.dimCols.find(keyDim equals _.column).get
       val encoders = new java.util.ArrayList[Encoding]()
       encoders.add(Encoding.DICTIONARY)
       val columnSchema: ColumnSchema = getColumnSchema(
@@ -381,11 +387,33 @@ class TableNewProcessor(cm: TableModel) {
         field.precision,
         field.scale,
         field.schemaOrdinal)
-      allColumns ++= Seq(columnSchema)
+      columnSchema.setSortColumn(true)
+      allColumns :+= columnSchema
       index = index + 1
-      if (field.children.isDefined && field.children.get != null) {
-        columnSchema.setNumberOfChild(field.children.get.size)
-        allColumns ++= getAllChildren(field.children)
+    }
+
+    cm.dimCols.foreach(field => {
+      val sortField = cm.sortKeyDims.get.find(field.column equals _)
+      if (sortField.isEmpty) {
+        val encoders = new java.util.ArrayList[Encoding]()
+        encoders.add(Encoding.DICTIONARY)
+        val columnSchema: ColumnSchema = getColumnSchema(
+          DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+          field.name.getOrElse(field.column),
+          index,
+          isCol = true,
+          encoders,
+          isDimensionCol = true,
+          -1,
+          field.precision,
+          field.scale,
+          field.schemaOrdinal)
+        allColumns :+= columnSchema
+        index = index + 1
+        if (field.children.isDefined && field.children.get != null) {
+          columnSchema.setNumberOfChild(field.children.get.size)
+          allColumns ++= getAllChildren(field.children)
+        }
       }
     })
 
@@ -402,10 +430,9 @@ class TableNewProcessor(cm: TableModel) {
         field.precision,
         field.scale,
         field.schemaOrdinal)
-      val measureCol = columnSchema
-
-      allColumns ++= Seq(measureCol)
+      allColumns :+= columnSchema
       index = index + 1
+      measureCount += 1
     })
 
     // Check if there is any duplicate measures or dimensions.
@@ -426,22 +453,6 @@ class TableNewProcessor(cm: TableModel) {
 
     updateColumnGroupsInFields(cm.columnGroups, allColumns)
 
-    var newOrderedDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    val complexDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    val measures = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    for (column <- allColumns) {
-      if (highCardinalityDims.contains(column.getColumnName)) {
-        newOrderedDims += column
-      } else if (column.isComplex) {
-        complexDims += column
-      } else if (column.isDimensionColumn) {
-        newOrderedDims += column
-      } else {
-        measures += column
-      }
-
-    }
-
     // Setting the boolean value of useInvertedIndex in column schema
     val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq())
     for (column <- allColumns) {
@@ -456,7 +467,7 @@ class TableNewProcessor(cm: TableModel) {
     }
 
     // Adding dummy measure if no measure is provided
-    if (measures.size < 1) {
+    if (measureCount == 0) {
       val encoders = new java.util.ArrayList[Encoding]()
       val columnSchema: ColumnSchema = getColumnSchema(DataType.DOUBLE,
         CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
@@ -466,13 +477,10 @@ class TableNewProcessor(cm: TableModel) {
         false,
         -1, 0, 0, schemaOrdinal = -1)
       columnSchema.setInvisible(true)
-      val measureColumn = columnSchema
-      measures += measureColumn
-      allColumns = allColumns ++ measures
+      allColumns :+= columnSchema
     }
     val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
     columnValidator.validateColumns(allColumns)
-    newOrderedDims = newOrderedDims ++ complexDims ++ measures
 
     val tableInfo = new TableInfo()
     val tableSchema = new TableSchema()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
index 8588868..b8f0a7c 100644
--- a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
@@ -32,23 +32,15 @@ private class TestCarbonSqlParserStub extends CarbonSqlParser {
 
   def updateColumnGroupsInFieldTest(fields: Seq[Field], tableProperties: Map[String, String]): Seq[String] = {
 
-     var (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
-      fields, tableProperties)
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
+     var (dims, msrs, noDictionaryDims, sortkey) = extractDimAndMsrFields(fields, tableProperties)
 
     updateColumnGroupsInField(tableProperties,
         noDictionaryDims, msrs, dims)
   }
 
-  def extractDimColsAndNoDictionaryFieldsTest(fields: Seq[Field], tableProperties: Map[String, String]): (Seq[Field],
-    Seq[String]) = {
-
-    extractDimColsAndNoDictionaryFields(fields, tableProperties)
-  }
-
-  def extractMsrColsFromFieldsTest(fields: Seq[Field], tableProperties: Map[String, String]): (Seq[Field]) = {
-
-    extractMsrColsFromFields(fields, tableProperties)
+  def extractDimAndMsrFieldsTest(fields: Seq[Field],
+      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = {
+    extractDimAndMsrFields(fields, tableProperties)
   }
 
 
@@ -199,7 +191,7 @@ class TestCarbonSqlParser extends QueryTest {
     val fields: Seq[Field] = loadAllFields
 
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub.extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
+    val (dimCols, _, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     // testing col
 
@@ -219,9 +211,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -242,9 +232,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -264,9 +252,8 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col4")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields,
+      tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 8)
@@ -287,9 +274,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col3", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -310,9 +295,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -333,9 +316,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -358,9 +339,7 @@ class TestCarbonSqlParser extends QueryTest {
     )
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 8)
@@ -382,9 +361,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE-> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col3")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -402,10 +379,11 @@ class TestCarbonSqlParser extends QueryTest {
 
   // Testing the extracting of measures
   test("Test-extractMsrColsFromFields") {
-    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2",
+      CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (_, msrCols, _, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     // testing col
     assert(msrCols.lift(0).get.column.equalsIgnoreCase("col4"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index b848543..b4b462a 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -28,18 +28,25 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 
 public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<InternalRow> {
 
+  boolean[] isMeasure;
+
   @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
-    super.initialize(carbonColumns, absoluteTableIdentifier);
     //can initialize and generate schema here.
+    isMeasure = new boolean[carbonColumns.length];
+    dataTypes = new DataType[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      isMeasure[i] = !carbonColumns[i].isDimesion();
+      dataTypes[i] = carbonColumns[i].getDataType();
+    }
   }
 
   @Override public InternalRow readRow(Object[] data) {
-    for (int i = 0; i < dictionaries.length; i++) {
+    for (int i = 0; i < isMeasure.length; i++) {
       if (data[i] == null) {
         continue;
       }
-      if (dictionaries[i] == null) {
+      if (isMeasure[i]) {
         if (dataTypes[i].equals(DataType.INT)) {
           data[i] = ((Long)(data[i])).intValue();
         } else if (dataTypes[i].equals(DataType.SHORT)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 2a9c701..27bbc2a 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.vectorreader;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 
 class ColumnarVectorWrapper implements CarbonColumnVector {
@@ -30,6 +31,14 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     this.columnVector = columnVector;
   }
 
+  @Override public void putBoolean(int rowId, boolean value) {
+    columnVector.putBoolean(rowId, value);
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    columnVector.putFloat(rowId, value);
+  }
+
   @Override public void putShort(int rowId, short value) {
     columnVector.putShort(rowId, value);
   }
@@ -112,4 +121,8 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   @Override public void reset() {
 //    columnVector.reset();
   }
+
+  @Override public DataType getType() {
+    return columnVector.dataType();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index f8bdcf8..9321706 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -51,18 +51,49 @@ object TableCreator {
     dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
   }
 
-  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
-                                                    tableProperties: Map[String, String]):
-  (Seq[Field], Seq[String]) = {
+  protected def extractDimAndMsrFields(fields: Seq[Field],
+      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = {
     var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+    var msrFields: Seq[Field] = Seq[Field]()
     var dictExcludeCols: Array[String] = Array[String]()
     var noDictionaryDims: Seq[String] = Seq[String]()
     var dictIncludeCols: Seq[String] = Seq[String]()
 
+    // All columns in sortkey should be there in create table cols
+    val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
+    var sortKeyDimsTmp: Seq[String] = Seq[String]()
+    if (sortKeyOption.isDefined) {
+      var sortKey = sortKeyOption.get.split(',').map(_.trim)
+      sortKey.foreach { column =>
+        if (!fields.exists(x => x.column.equalsIgnoreCase(column))) {
+          val errormsg = "sort_columns: " + column +
+            " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        } else {
+          val dataType = fields.find(x =>
+            x.column.equalsIgnoreCase(column)).get.dataType.get
+          if (isComplexDimDictionaryExclude(dataType)) {
+            val errormsg = "sort_columns is unsupported for complex datatype column: " + column
+            throw new MalformedCarbonCommandException(errormsg)
+          }
+        }
+      }
+
+      sortKey.foreach { dimension =>
+        if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase(_))) {
+          fields.foreach { field =>
+            if (field.column.equalsIgnoreCase(dimension)) {
+              sortKeyDimsTmp :+= field.column
+            }
+          }
+        }
+      }
+    }
+
     // All excluded cols should be there in create table cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       dictExcludeCols =
-        tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
       dictExcludeCols
         .foreach { dictExcludeCol =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
@@ -97,7 +128,7 @@ object TableCreator {
       }
     }
 
-    // include cols should contain exclude cols
+    // include cols should not contain exclude cols
     dictExcludeCols.foreach { dicExcludeCol =>
       if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
         val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
@@ -108,11 +139,10 @@ object TableCreator {
 
     // by default consider all String cols as dims and if any dictionary exclude is present then
     // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
-    fields.foreach(field => {
-
+    fields.foreach { field =>
       if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
-        if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.TIMESTAMP &&
-            DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.DATE) {
+        val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
+        if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE) {
           noDictionaryDims :+= field.column
         }
         dimFields += field
@@ -120,49 +150,30 @@ object TableCreator {
         dimFields += field
       } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
         dimFields += field
+      } else if (sortKeyDimsTmp.exists(x => x.equalsIgnoreCase(field.column))) {
+        noDictionaryDims :+= field.column
+        dimFields += field
+      } else {
+        msrFields :+= field
       }
     }
-    )
-
-    (dimFields.toSeq, noDictionaryDims)
-  }
-
-  /**
-   * Extract the Measure Cols fields. By default all non string cols will be measures.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractMsrColsFromFields(fields: Seq[Field],
-                                         tableProperties: Map[String, String]): Seq[Field] = {
-    var msrFields: Seq[Field] = Seq[Field]()
-    var dictIncludedCols: Array[String] = Array[String]()
-    var dictExcludedCols: Array[String] = Array[String]()
-
-    // get all included cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
-      dictIncludedCols =
-        tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(',').map(_.trim)
-    }
 
-    // get all excluded cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
-      dictExcludedCols =
-        tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
-    }
-
-    // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
-    fields.foreach(field => {
-      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
-        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
-          !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
-          msrFields :+= field
+    var sortKeyDims = sortKeyDimsTmp
+    if (sortKeyOption.isEmpty) {
+      // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS.
+      dimFields.foreach { field =>
+        if (!isComplexDimDictionaryExclude(field.dataType.get)) {
+          sortKeyDims :+= field.column
         }
       }
-    })
-
-    msrFields
+    }
+    if (sortKeyDims.isEmpty) {
+      // no SORT_COLUMNS
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, "")
+    } else {
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKeyDims.mkString(","))
+    }
+    (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims)
   }
 
   def getKey(parentColumnName: Option[String],
@@ -440,27 +451,24 @@ object TableCreator {
   }
 
   def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
-                        , tableName: String, fields: Seq[Field],
-                        partitionCols: Seq[PartitionerField],
-                        bucketFields: Option[BucketFields],
-                        tableProperties: Map[String, String]): TableModel
+      , tableName: String, fields: Seq[Field],
+      partitionCols: Seq[PartitionerField],
+      bucketFields: Option[BucketFields],
+      tableProperties: Map[String, String]): TableModel
   = {
 
-    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
+    fields.zipWithIndex.foreach { x =>
+      x._1.schemaOrdinal = x._2
+    }
+    val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields(
       fields, tableProperties)
     if (dims.isEmpty) {
-      throw new MalformedCarbonCommandException(s"Table ${
-        dbName.getOrElse(
-          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-      }.$tableName"
-        +
-        " can not be created without key columns. Please " +
-        "use DICTIONARY_INCLUDE or " +
-        "DICTIONARY_EXCLUDE to set at least one key " +
-        "column " +
+      throw new MalformedCarbonCommandException(
+        s"Table ${dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME)}.$tableName " +
+        "can not be created without key columns. Please use DICTIONARY_INCLUDE or " +
+        "DICTIONARY_EXCLUDE to set at least one key column " +
         "if all specified columns are numeric types")
     }
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
 
     // column properties
     val colProps = extractColumnProperties(fields, tableProperties)
@@ -474,18 +482,20 @@ object TableCreator {
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
 
-    TableModel(ifNotExistPresent,
+    TableModel(
+      ifNotExistPresent,
       dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
       dbName,
       tableName,
       tableProperties,
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
       msrs.map(f => normalizeType(f)),
+      Option(sortKeyDims),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
       Some(colProps),
-      bucketFields)
+      bucketFields: Option[BucketFields])
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 0bd3e45..407ac2f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -76,6 +76,10 @@ public class CarbonDataLoadConfiguration {
 
   private DictionaryCardinalityFinder cardinalityFinder;
 
+  private int numberOfSortColumns;
+
+  private int numberOfNoDictSortColumns;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -121,6 +125,22 @@ public class CarbonDataLoadConfiguration {
     return dimCount;
   }
 
+  public void setNumberOfSortColumns(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
+  }
+
+  public int getNumberOfSortColumns() {
+    return this.numberOfSortColumns;
+  }
+
+  public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
+    this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+  }
+
+  public int getNumberOfNoDictSortColumns() {
+    return this.numberOfNoDictSortColumns;
+  }
+
   public int getComplexDimensionCount() {
     int dimCount = 0;
     for (int i = 0; i < dataFields.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 4ebb2fb..1932888 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -195,6 +195,8 @@ public final class DataLoadProcessBuilder {
     configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
     configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
     configuration.setPreFetch(loadModel.isPreFetch());
+    configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
+    configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
 
     return configuration;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
index 9e4b50d..3accb0b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -16,8 +16,6 @@
  */
 package org.apache.carbondata.processing.newflow.converter.impl;
 
-import java.nio.charset.Charset;
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -48,23 +46,24 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
     this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
-  @Override
-  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+  @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
     String dimensionValue = row.getString(index);
     if (dimensionValue == null || dimensionValue.equals(nullformat)) {
-      dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
-    }
-    if (dataType != DataType.STRING) {
-      if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, dataType)) {
-        if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
+      row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
+    } else {
+      try {
+        row.update(DataTypeUtil
+            .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType), index);
+      } catch (Throwable ex) {
+        if (dimensionValue.length() != 0 || isEmptyBadRecord) {
           logHolder.setReason(
               "The value " + " \"" + dimensionValue + "\"" + " with column name " + column
                   .getColName() + " and column data type " + dataType + " is not a valid "
                   + dataType + " type.");
+        } else {
+          row.update(new byte[0], index);
         }
       }
     }
-    row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
-        index);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index ad96578..a0e4ef1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -85,7 +85,8 @@ public class ParallelReadMergeSorterImpl implements Sorter {
             sortParameters.getDimColCount(),
             sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
             sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
-            sortParameters.getNoDictionaryDimnesionColumn());
+            sortParameters.getNoDictionaryDimnesionColumn(),
+            sortParameters.getNoDictionarySortColumn());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index e3049d2..430bf1f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -139,7 +139,8 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
         new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
             sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
             sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
-            sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn());
+            sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(),
+            this.sortParameters.getNoDictionarySortColumn());
     return finalMerger;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index e468028..61c6cca 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -34,6 +34,8 @@ public class UnsafeCarbonRowPage {
 
   private boolean[] noDictionaryDimensionMapping;
 
+  private boolean[] noDictionarySortColumnMapping;
+
   private int dimensionSize;
 
   private int measureSize;
@@ -52,9 +54,11 @@ public class UnsafeCarbonRowPage {
 
   private boolean saveToDisk;
 
-  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping, int dimensionSize,
-      int measureSize, char[] aggType, MemoryBlock memoryBlock, boolean saveToDisk) {
+  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, char[] aggType,
+      MemoryBlock memoryBlock, boolean saveToDisk) {
     this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+    this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
     this.dimensionSize = dimensionSize;
     this.measureSize = measureSize;
     this.aggType = aggType;
@@ -324,4 +328,7 @@ public class UnsafeCarbonRowPage {
     return noDictionaryDimensionMapping;
   }
 
+  public boolean[] getNoDictionarySortColumnMapping() {
+    return noDictionarySortColumnMapping;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 9907509..1dc980f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -108,6 +108,7 @@ public class UnsafeSortDataRows {
   public void initialize() throws CarbonSortKeyAndGroupByException {
     MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024);
     this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+        parameters.getNoDictionarySortColumn(),
         parameters.getDimColCount() + parameters.getComplexDimColCount(),
         parameters.getMeasureColCount(), parameters.getAggType(), baseBlock,
         !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
@@ -171,6 +172,7 @@ public class UnsafeSortDataRows {
             MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024);
             boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
             rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+                parameters.getNoDictionarySortColumn(),
                 parameters.getDimColCount() + parameters.getComplexDimColCount(),
                 parameters.getMeasureColCount(), parameters.getAggType(), memoryBlock, saveToDisk);
             bytesAdded += rowPage.addRow(rowBatch[i]);
@@ -198,12 +200,12 @@ public class UnsafeSortDataRows {
     if (this.rowPage.getUsedSize() > 0) {
       TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
           new UnsafeIntSortDataFormat(rowPage));
-      if (parameters.getNoDictionaryCount() > 0) {
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
         timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
             new UnsafeRowComparator(rowPage));
       } else {
         timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), rowPage));
+            new UnsafeRowComparatorForNormalDIms(rowPage));
       }
       unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
     } else {
@@ -288,12 +290,13 @@ public class UnsafeSortDataRows {
         long startTime = System.currentTimeMillis();
         TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
             new UnsafeIntSortDataFormat(page));
-        if (parameters.getNoDictionaryCount() > 0) {
+        // if sort_columns is not none, sort by sort_columns
+        if (parameters.getNumberOfNoDictSortColumns() > 0) {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
               new UnsafeRowComparator(page));
         } else {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), page));
+              new UnsafeRowComparatorForNormalDIms(page));
         }
         if (rowPage.isSaveToDisk()) {
           // create a new file every time

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
index e61a284..476b8ac 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -27,14 +27,14 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonR
 public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
 
   /**
-   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
+   * mapping of dictionary and no dictionary of sort_columns.
    */
-  private boolean[] noDictionaryColMaping;
+  private boolean[] noDictionarySortColumnMaping;
 
   private Object baseObject;
 
   public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
-    this.noDictionaryColMaping = rowPage.getNoDictionaryDimensionMapping();
+    this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
     this.baseObject = rowPage.getDataBlock().getBaseObject();
   }
 
@@ -47,7 +47,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     long rowB = rowR.address;
     int sizeA = 0;
     int sizeB = 0;
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
       if (isNoDictionary) {
         short aShort1 = CarbonUnsafe.unsafe.getShort(baseObject, rowA + sizeA);
         sizeA += 2;
@@ -89,7 +89,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     long rowB = rowR.address;
     int sizeA = 0;
     int sizeB = 0;
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
       if (isNoDictionary) {
         short aShort1 = CarbonUnsafe.unsafe.getShort(baseObjectL, rowA + sizeA);
         sizeA += 2;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
index 7448aee..4fd245f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -27,11 +27,11 @@ public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbon
 
   private Object baseObject;
 
-  private int dimCount;
+  private int numberOfSortColumns;
 
-  public UnsafeRowComparatorForNormalDIms(int dimCount, UnsafeCarbonRowPage rowPage) {
+  public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
     this.baseObject = rowPage.getDataBlock().getBaseObject();
-    this.dimCount = dimCount;
+    this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
   }
 
   /**
@@ -43,7 +43,7 @@ public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbon
     long rowB = rowR.address;
     int sizeA = 0;
     int sizeB = 0;
-    for (int i = 0; i < dimCount; i++) {
+    for (int i = 0; i < numberOfSortColumns; i++) {
       int dimFieldA = CarbonUnsafe.unsafe.getInt(baseObject, rowA + sizeA);
       sizeA += 4;
       int dimFieldB = CarbonUnsafe.unsafe.getInt(baseObject, rowB + sizeB);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index ed9e0a6..397de63 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -45,13 +45,13 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
   private int columnSize;
 
   public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
-      boolean[] noDictMapping, int columnSize) {
+      boolean[] noDictSortColumnMapping, int columnSize) {
     this.actualSize = merger.getEntryCount();
     this.mergedAddresses = merger.getMergedAddresses();
     this.rowPageIndexes = merger.getRowPageIndexes();
     this.rowPages = merger.getUnsafeCarbonRowPages();
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new NewRowComparator(noDictMapping);
+    this.comparator = new NewRowComparator(noDictSortColumnMapping);
     this.columnSize = columnSize;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
index f491623..048f4f8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -41,11 +41,12 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
 
   private int columnSize;
 
-  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize) {
+  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
+      int numberOfSortColumns) {
     this.actualSize = rowPage.getBuffer().getActualSize();
     this.rowPage = rowPage;
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new NewRowComparator(rowPage.getNoDictionaryDimensionMapping());
+    this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
     this.columnSize = columnSize;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 60f259e..6d04ebf 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -153,7 +153,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     this.aggType = parameters.getAggType();
     this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
     this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
-    comparator = new NewRowComparator(isNoDictionaryDimensionColumn);
+    comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
     initialize();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index b98a072..ab59395 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -126,7 +126,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
 
         SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
             parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
-                .getMeasureColCount());
+                .getMeasureColCount(), parameters.getNumberOfSortColumns());
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -137,7 +137,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
       for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
 
         SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionaryDimnesionColumn(),
+            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(),
                 parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
                     .getMeasureColCount());
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 0ac2d5c..dbddc1d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -251,7 +251,8 @@ public class IntermediateFileMerger implements Callable<Void> {
           new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
               mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
               mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn());
+              mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn(),
+              mergerParameters.getNoDictionarySortColumn());
 
       // initialize
       sortTempFileChunkHolder.initialize();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
index dd9358c..247251e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
@@ -24,15 +24,15 @@ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 public class NewRowComparator implements Comparator<Object[]> {
 
   /**
-   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
+   * mapping of dictionary dimensions and no dictionary of sort_column.
    */
-  private boolean[] noDictionaryColMaping;
+  private boolean[] noDictionarySortColumnMaping;
 
   /**
-   * @param noDictionaryColMaping
+   * @param noDictionarySortColumnMaping
    */
-  public NewRowComparator(boolean[] noDictionaryColMaping) {
-    this.noDictionaryColMaping = noDictionaryColMaping;
+  public NewRowComparator(boolean[] noDictionarySortColumnMaping) {
+    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
   }
 
   /**
@@ -43,7 +43,7 @@ public class NewRowComparator implements Comparator<Object[]> {
 
     int index = 0;
 
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
 
       if (isNoDictionary) {
         byte[] byteArr1 = (byte[]) rowA[index];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
index d913b32..241882e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
@@ -26,15 +26,15 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
   /**
    * dimension count
    */
-  private int dimensionCount;
+  private int numberOfSortColumns;
 
   /**
    * RowComparatorForNormalDims Constructor
    *
-   * @param dimensionCount
+   * @param numberOfSortColumns
    */
-  public NewRowComparatorForNormalDims(int dimensionCount) {
-    this.dimensionCount = dimensionCount;
+  public NewRowComparatorForNormalDims(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
   }
 
   /**
@@ -45,7 +45,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
   public int compare(Object[] rowA, Object[] rowB) {
     int diff = 0;
 
-    for (int i = 0; i < dimensionCount; i++) {
+    for (int i = 0; i < numberOfSortColumns; i++) {
 
       int dimFieldA = (int)rowA[i];
       int dimFieldB = (int)rowB[i];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
index c282f52..2584048 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
@@ -33,15 +33,15 @@ public class RowComparator implements Comparator<Object[]> {
   /**
    * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
    */
-  private boolean[] noDictionaryColMaping;
+  private boolean[] noDictionarySortColumnMaping;
 
   /**
-   * @param noDictionaryColMaping
+   * @param noDictionarySortColumnMaping
    * @param noDictionaryCount
    */
-  public RowComparator(boolean[] noDictionaryColMaping, int noDictionaryCount) {
+  public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) {
     this.noDictionaryCount = noDictionaryCount;
-    this.noDictionaryColMaping = noDictionaryColMaping;
+    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
   }
 
   /**
@@ -53,7 +53,7 @@ public class RowComparator implements Comparator<Object[]> {
     int normalIndex = 0;
     int noDictionaryindex = 0;
 
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
 
       if (isNoDictionary) {
         byte[] byteArr1 = (byte[]) rowA[IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex()];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
index ceaf5c6..8d914ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
@@ -28,15 +28,15 @@ public class RowComparatorForNormalDims implements Comparator<Object[]> {
   /**
    * dimension count
    */
-  private int dimensionCount;
+  private int numberOfSortColumns;
 
   /**
    * RowComparatorForNormalDims Constructor
    *
-   * @param dimensionCount
+   * @param numberOfSortColumns
    */
-  public RowComparatorForNormalDims(int dimensionCount) {
-    this.dimensionCount = dimensionCount;
+  public RowComparatorForNormalDims(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
   }
 
   /**
@@ -47,7 +47,7 @@ public class RowComparatorForNormalDims implements Comparator<Object[]> {
   public int compare(Object[] rowA, Object[] rowB) {
     int diff = 0;
 
-    for (int i = 0; i < dimensionCount; i++) {
+    for (int i = 0; i < numberOfSortColumns; i++) {
 
       int dimFieldA = NonDictionaryUtil.getDimension(i, rowA);
       int dimFieldB = NonDictionaryUtil.getDimension(i, rowB);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 9b5a850..949d8f9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -191,11 +191,10 @@ public class SortDataRows {
       Object[][] toSort;
       toSort = new Object[entryCount][];
       System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
-
-      if (parameters.getNoDictionaryCount() > 0) {
-        Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
+        Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn()));
       } else {
-        Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getDimColCount()));
+        Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
       }
       recordHolderList = toSort;
 
@@ -385,12 +384,12 @@ public class SortDataRows {
     @Override public Void call() throws Exception {
       try {
         long startTime = System.currentTimeMillis();
-        if (parameters.getNoDictionaryCount() > 0) {
+        if (parameters.getNumberOfNoDictSortColumns() > 0) {
           Arrays.sort(recordHolderArray,
-              new NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
+              new NewRowComparator(parameters.getNoDictionarySortColumn()));
         } else {
           Arrays.sort(recordHolderArray,
-              new NewRowComparatorForNormalDims(parameters.getDimColCount()));
+              new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
         }
 
         // create a new file every time

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index d42dc32..40e933d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -112,6 +112,12 @@ public class SortParameters {
    */
   private boolean[] noDictionaryDimnesionColumn;
 
+  private boolean[] noDictionarySortColumn;
+
+  private int numberOfSortColumns;
+
+  private int numberOfNoDictSortColumns;
+
   private int numberOfCores;
 
   public SortParameters getCopy() {
@@ -137,6 +143,9 @@ public class SortParameters {
     parameters.segmentId = segmentId;
     parameters.taskNo = taskNo;
     parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+    parameters.noDictionarySortColumn = noDictionarySortColumn;
+    parameters.numberOfSortColumns = numberOfSortColumns;
+    parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
     parameters.numberOfCores = numberOfCores;
     return parameters;
   }
@@ -317,6 +326,30 @@ public class SortParameters {
     this.numberOfCores = numberOfCores;
   }
 
+  public int getNumberOfSortColumns() {
+    return numberOfSortColumns;
+  }
+
+  public void setNumberOfSortColumns(int numberOfSortColumns) {
+    this.numberOfSortColumns = Math.min(numberOfSortColumns, this.dimColCount);
+  }
+
+  public boolean[] getNoDictionarySortColumn() {
+    return noDictionarySortColumn;
+  }
+
+  public void setNoDictionarySortColumn(boolean[] noDictionarySortColumn) {
+    this.noDictionarySortColumn = noDictionarySortColumn;
+  }
+
+  public int getNumberOfNoDictSortColumns() {
+    return numberOfNoDictSortColumns;
+  }
+
+  public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
+    this.numberOfNoDictSortColumns = Math.min(numberOfNoDictSortColumns, noDictionaryCount);
+  }
+
   public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
     SortParameters parameters = new SortParameters();
     CarbonTableIdentifier tableIdentifier =
@@ -334,6 +367,16 @@ public class SortParameters {
     parameters.setComplexDimColCount(configuration.getComplexDimensionCount());
     parameters.setNoDictionaryDimnesionColumn(
         CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+    parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
+    parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
+    if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length) {
+      parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
+    } else {
+      boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()];
+      System.arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0,
+          noDictionarySortColumnTemp, 0, parameters.getNumberOfSortColumns());
+      parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
+    }
     parameters.setObserver(new SortObserver());
     // get sort buffer size
     parameters.setSortBufferSize(Integer.parseInt(carbonProperties

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index ae01404..2a4346d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -133,6 +133,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   private boolean[] isNoDictionaryDimensionColumn;
 
   /**
+   * to store whether sort column is of dictionary type or not
+   */
+  private boolean[] isNoDictionarySortColumn;
+
+  /**
    * Constructor to initialize
    *
    * @param tempFile
@@ -146,7 +151,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    */
   public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
       int measureCount, int fileBufferSize, int noDictionaryCount, char[] aggType,
-      boolean[] isNoDictionaryDimensionColumn) {
+      boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
     // set temp file
     this.tempFile = tempFile;
 
@@ -160,7 +165,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     this.fileBufferSize = fileBufferSize;
     this.executorService = Executors.newFixedThreadPool(1);
     this.aggType = aggType;
+
     this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
+    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
   }
 
   /**
@@ -409,7 +416,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     int[] rightMdkArray = (int[]) other.returnRow[0];
     byte[][] leftNonDictArray = (byte[][]) returnRow[1];
     byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
-    for (boolean isNoDictionary : isNoDictionaryDimensionColumn) {
+    for (boolean isNoDictionary : isNoDictionarySortColumn) {
       if (isNoDictionary) {
         diff = UnsafeComparer.INSTANCE
             .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2affa03..fe3579b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -709,13 +709,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     int i = 0;
     int dictionaryColumnCount = -1;
     int noDictionaryColumnCount = -1;
+    boolean isSortColumn = false;
     for (i = 0; i < dimensionType.length; i++) {
+      isSortColumn = i < segmentProperties.getNumberOfSortColumns();
       if (dimensionType[i]) {
         dictionaryColumnCount++;
         if (colGrpModel.isColumnar(dictionaryColumnCount)) {
           submit.add(executorService.submit(
-              new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
-                  isUseInvertedIndex[i])));
+              new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn)));
         } else {
           submit.add(
               executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -723,7 +725,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       } else {
         submit.add(executorService.submit(
             new BlockSortThread(i, noDictionaryColumnsData[++noDictionaryColumnCount], false, true,
-                true, isUseInvertedIndex[i])));
+                isSortColumn, isUseInvertedIndex[i] & isSortColumn)));
       }
     }
     for (int k = 0; k < complexColCount; k++) {
@@ -747,7 +749,42 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
     byte[] composedNonDictStartKey = null;
     byte[] composedNonDictEndKey = null;
-    if (noDictionaryStartKey != null) {
+
+    int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns();
+    // generate start/end key by sort_columns
+    if (numberOfDictSortColumns > 0) {
+      // if sort_columns contain dictionary columns
+      int[] keySize = columnarSplitter.getBlockKeySize();
+      if (keySize.length > numberOfDictSortColumns) {
+        int newMdkLength = 0;
+        for (int index = 0; index < numberOfDictSortColumns; index++) {
+          newMdkLength += keySize[index];
+        }
+        byte[] newStartKeyOfSortKey = new byte[newMdkLength];
+        byte[] newEndKeyOfSortKey = new byte[newMdkLength];
+        System.arraycopy(startkeyLocal, 0, newStartKeyOfSortKey, 0, newMdkLength);
+        System.arraycopy(endKeyLocal, 0, newEndKeyOfSortKey, 0, newMdkLength);
+        startkeyLocal = newStartKeyOfSortKey;
+        endKeyLocal = newEndKeyOfSortKey;
+      }
+    } else {
+      startkeyLocal = new byte[0];
+      endKeyLocal = new byte[0];
+    }
+
+    int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns();
+    if (numberOfNoDictSortColumns > 0) {
+      // if sort_columns contain no-dictionary columns
+      if (noDictionaryStartKey.length > numberOfNoDictSortColumns) {
+        byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
+        byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
+        System.arraycopy(noDictionaryStartKey, 0, newNoDictionaryStartKey, 0,
+            numberOfNoDictSortColumns);
+        System
+            .arraycopy(noDictionaryEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
+        noDictionaryStartKey = newNoDictionaryStartKey;
+        noDictionaryEndKey = newNoDictionaryEndKey;
+      }
       composedNonDictStartKey =
           NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryStartKey);
       composedNonDictEndKey =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 68f9bd5..f8454f1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -101,9 +101,11 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    */
   private boolean[] isNoDictionaryColumn;
 
+  private boolean[] isNoDictionarySortColumn;
+
   public SingleThreadFinalSortFilesMerger(String tempFileLocation, String tableName,
       int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
-      char[] aggType, boolean[] isNoDictionaryColumn) {
+      char[] aggType, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
     this.dimensionCount = dimensionCount;
@@ -112,6 +114,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
     this.aggType = aggType;
     this.noDictionaryCount = noDictionaryCount;
     this.isNoDictionaryColumn = isNoDictionaryColumn;
+    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
   }
 
   /**
@@ -180,7 +183,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
           // create chunk holder
           SortTempFileChunkHolder sortTempFileChunkHolder =
               new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
-                  measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn);
+                  measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn,
+                  isNoDictionarySortColumn);
 
           // initialize
           sortTempFileChunkHolder.initialize();


[3/3] incubator-carbondata git commit: [CARBONDATA-782]support SORT_COLUMNS This closes #715

Posted by ja...@apache.org.
[CARBONDATA-782]support SORT_COLUMNS This closes #715


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5763f8c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5763f8c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5763f8c6

Branch: refs/heads/12-dev
Commit: 5763f8c60372bb42e30464e90f922a01a2d05343
Parents: 3b62d25 f993908
Author: jackylk <ja...@huawei.com>
Authored: Fri Mar 31 21:08:43 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Fri Mar 31 21:08:43 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   1 +
 .../core/datastore/block/SegmentProperties.java |  22 ++
 .../impl/VariableLengthDimensionDataChunk.java  |  58 +++-
 .../core/keygenerator/mdkey/Bits.java           |   6 +-
 .../ThriftWrapperSchemaConverterImpl.java       |  18 +
 .../core/metadata/schema/table/CarbonTable.java |  28 ++
 .../schema/table/column/ColumnSchema.java       |  10 +
 .../impl/DictionaryBasedResultCollector.java    |  12 +-
 ...structureBasedDictionaryResultCollector.java |   4 +-
 .../carbondata/core/scan/filter/FilterUtil.java |  90 +++--
 .../visitor/NoDictionaryTypeVisitor.java        |   3 +-
 .../visitor/RangeNoDictionaryTypeVisitor.java   |   3 +-
 .../scan/result/vector/CarbonColumnVector.java  |   7 +
 .../util/AbstractDataFileFooterConverter.java   |   7 +
 .../apache/carbondata/core/util/ByteUtil.java   | 336 ++++++++++++++++++-
 .../carbondata/core/util/DataTypeUtil.java      |  70 ++++
 .../core/scan/filter/FilterUtilTest.java        |   7 +-
 .../impl/DictionaryDecodeReadSupport.java       |   3 +
 .../hadoop/test/util/StoreCreator.java          |   5 +
 .../testsuite/sortcolumns/TestSortColumns.scala | 188 +++++++++++
 .../spark/merger/RowResultMerger.java           |  17 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 118 ++++---
 .../execution/command/carbonTableSchema.scala   |  66 ++--
 .../apache/spark/sql/TestCarbonSqlParser.scala  |  56 +---
 .../readsupport/SparkRowReadSupportImpl.java    |  13 +-
 .../vectorreader/ColumnarVectorWrapper.java     |  13 +
 .../org/apache/spark/sql/TableCreator.scala     | 138 ++++----
 .../newflow/CarbonDataLoadConfiguration.java    |  20 ++
 .../newflow/DataLoadProcessBuilder.java         |   2 +
 .../impl/NonDictionaryFieldConverterImpl.java   |  21 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   3 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |   3 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        |  11 +-
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |  11 +-
 .../unsafe/comparator/UnsafeRowComparator.java  |  10 +-
 .../UnsafeRowComparatorForNormalDIms.java       |   8 +-
 .../holder/UnsafeFinalMergePageHolder.java      |   4 +-
 .../unsafe/holder/UnsafeInmemoryHolder.java     |   5 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |   2 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |   4 +-
 .../sortdata/IntermediateFileMerger.java        |   3 +-
 .../sortdata/NewRowComparator.java              |  12 +-
 .../sortdata/NewRowComparatorForNormalDims.java |  10 +-
 .../sortandgroupby/sortdata/RowComparator.java  |  10 +-
 .../sortdata/RowComparatorForNormalDims.java    |  10 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |  13 +-
 .../sortandgroupby/sortdata/SortParameters.java |  43 +++
 .../sortdata/SortTempFileChunkHolder.java       |  11 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  45 ++-
 .../store/SingleThreadFinalSortFilesMerger.java |   8 +-
 50 files changed, 1253 insertions(+), 315 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-carbondata git commit: sort columns

Posted by ja...@apache.org.
sort columns


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f993908d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f993908d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f993908d

Branch: refs/heads/12-dev
Commit: f993908d3522ce6babed770817213d6b4c43ee06
Parents: 3b62d25
Author: QiangCai <qi...@qq.com>
Authored: Thu Mar 2 17:48:54 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Mar 31 20:47:11 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   1 +
 .../core/datastore/block/SegmentProperties.java |  22 ++
 .../impl/VariableLengthDimensionDataChunk.java  |  58 +++-
 .../core/keygenerator/mdkey/Bits.java           |   6 +-
 .../ThriftWrapperSchemaConverterImpl.java       |  18 +
 .../core/metadata/schema/table/CarbonTable.java |  28 ++
 .../schema/table/column/ColumnSchema.java       |  10 +
 .../impl/DictionaryBasedResultCollector.java    |  12 +-
 ...structureBasedDictionaryResultCollector.java |   4 +-
 .../carbondata/core/scan/filter/FilterUtil.java |  90 +++--
 .../visitor/NoDictionaryTypeVisitor.java        |   3 +-
 .../visitor/RangeNoDictionaryTypeVisitor.java   |   3 +-
 .../scan/result/vector/CarbonColumnVector.java  |   7 +
 .../util/AbstractDataFileFooterConverter.java   |   7 +
 .../apache/carbondata/core/util/ByteUtil.java   | 336 ++++++++++++++++++-
 .../carbondata/core/util/DataTypeUtil.java      |  70 ++++
 .../core/scan/filter/FilterUtilTest.java        |   7 +-
 .../impl/DictionaryDecodeReadSupport.java       |   3 +
 .../hadoop/test/util/StoreCreator.java          |   5 +
 .../testsuite/sortcolumns/TestSortColumns.scala | 188 +++++++++++
 .../spark/merger/RowResultMerger.java           |  17 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 118 ++++---
 .../execution/command/carbonTableSchema.scala   |  66 ++--
 .../apache/spark/sql/TestCarbonSqlParser.scala  |  56 +---
 .../readsupport/SparkRowReadSupportImpl.java    |  13 +-
 .../vectorreader/ColumnarVectorWrapper.java     |  13 +
 .../org/apache/spark/sql/TableCreator.scala     | 138 ++++----
 .../newflow/CarbonDataLoadConfiguration.java    |  20 ++
 .../newflow/DataLoadProcessBuilder.java         |   2 +
 .../impl/NonDictionaryFieldConverterImpl.java   |  21 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   3 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |   3 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        |  11 +-
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |  11 +-
 .../unsafe/comparator/UnsafeRowComparator.java  |  10 +-
 .../UnsafeRowComparatorForNormalDIms.java       |   8 +-
 .../holder/UnsafeFinalMergePageHolder.java      |   4 +-
 .../unsafe/holder/UnsafeInmemoryHolder.java     |   5 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |   2 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |   4 +-
 .../sortdata/IntermediateFileMerger.java        |   3 +-
 .../sortdata/NewRowComparator.java              |  12 +-
 .../sortdata/NewRowComparatorForNormalDims.java |  10 +-
 .../sortandgroupby/sortdata/RowComparator.java  |  10 +-
 .../sortdata/RowComparatorForNormalDims.java    |  10 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |  13 +-
 .../sortandgroupby/sortdata/SortParameters.java |  43 +++
 .../sortdata/SortTempFileChunkHolder.java       |  11 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  45 ++-
 .../store/SingleThreadFinalSortFilesMerger.java |   8 +-
 50 files changed, 1253 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 789c321..248b129 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -805,6 +805,7 @@ public final class CarbonCommonConstants {
   public static final String COLUMN_GROUPS = "column_groups";
   public static final String DICTIONARY_EXCLUDE = "dictionary_exclude";
   public static final String DICTIONARY_INCLUDE = "dictionary_include";
+  public static final String SORT_COLUMNS = "sort_columns";
   public static final String PARTITIONCLASS = "partitionclass";
   public static final String PARTITIONCOUNT = "partitioncount";
   public static final String COLUMN_PROPERTIES = "columnproperties";

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index 6c7d014..3bef423 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -151,6 +151,10 @@ public class SegmentProperties {
    */
   private ColumnGroupModel colGroupModel;
 
+  private int numberOfSortColumns = 0;
+
+  private int numberOfNoDictSortColumns = 0;
+
   public SegmentProperties(List<ColumnSchema> columnsInTable, int[] columnCardinality) {
     dimensions = new ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     complexDimensions =
@@ -336,6 +340,9 @@ public class SegmentProperties {
         if (CarbonUtil.hasEncoding(columnSchema.getEncodingList(), Encoding.DICTIONARY)
             && !isComplexDimensionStarted && columnSchema.getNumberOfChild() == 0) {
           cardinalityIndexForNormalDimensionColumn.add(tableOrdinal);
+          if (columnSchema.isSortColumn()) {
+            this.numberOfSortColumns++;
+          }
           if (columnSchema.isColumnar()) {
             // if it is a columnar dimension participated in mdkey then added
             // key ordinal and dimension ordinal
@@ -385,6 +392,10 @@ public class SegmentProperties {
           // for no dictionary dimension
           carbonDimension = new CarbonDimension(columnSchema, dimensonOrdinal++, -1, -1, -1);
           numberOfNoDictionaryDimension++;
+          if (columnSchema.isSortColumn()) {
+            this.numberOfSortColumns++;
+            this.numberOfNoDictSortColumns++;
+          }
         }
         dimensions.add(carbonDimension);
       } else {
@@ -797,4 +808,15 @@ public class SegmentProperties {
     return CarbonUtil.getMeasureFromCurrentBlock(this.measures, columnId);
   }
 
+  public int getNumberOfSortColumns() {
+    return numberOfSortColumns;
+  }
+
+  public int getNumberOfNoDictSortColumns() {
+    return numberOfNoDictSortColumns;
+  }
+
+  public int getNumberOfDictSortColumns() {
+    return this.numberOfSortColumns - this.numberOfNoDictionaryDimension;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
index 92c1e2f..4897985 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
@@ -24,6 +24,18 @@ import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFacto
 import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
 
 /**
  * This class is gives access to variable length dimension data chunk store
@@ -33,8 +45,8 @@ public class VariableLengthDimensionDataChunk extends AbstractDimensionDataChunk
   /**
    * Constructor for this class
    *
-   * @param dataChunkStore  data chunk
-   * @param chunkAttributes chunk attributes
+   * @param dataChunks  data chunk
+   * @param invertedIndex chunk attributes
    */
   public VariableLengthDimensionDataChunk(byte[] dataChunks, int[] invertedIndex,
       int[] invertedIndexReverse, int numberOfRows) {
@@ -114,7 +126,26 @@ public class VariableLengthDimensionDataChunk extends AbstractDimensionDataChunk
       if (value == null || Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
         vector.putNull(vectorOffset++);
       } else {
-        vector.putBytes(vectorOffset++, value);
+        DataType dt = vector.getType();
+        if (dt instanceof StringType) {
+          vector.putBytes(vectorOffset++, value);
+        } else if (dt instanceof BooleanType) {
+          vector.putBoolean(vectorOffset++, ByteUtil.toBoolean(value));
+        } else if (dt instanceof ShortType) {
+          vector.putShort(vectorOffset++, ByteUtil.toShort(value, 0, value.length));
+        } else if (dt instanceof IntegerType) {
+          vector.putInt(vectorOffset++, ByteUtil.toInt(value, 0, value.length));
+        } else if (dt instanceof FloatType) {
+          vector.putFloat(vectorOffset++, ByteUtil.toFloat(value, 0));
+        } else if (dt instanceof DoubleType) {
+          vector.putDouble(vectorOffset++, ByteUtil.toDouble(value, 0));
+        } else if (dt instanceof LongType) {
+          vector.putLong(vectorOffset++, ByteUtil.toLong(value, 0, value.length));
+        } else if (dt instanceof DecimalType) {
+          vector.putDecimal(vectorOffset++,
+              Decimal.apply(ByteUtil.toBigDecimal(value, 0, value.length)),
+              DecimalType.MAX_PRECISION());
+        }
       }
     }
     return column + 1;
@@ -143,7 +174,26 @@ public class VariableLengthDimensionDataChunk extends AbstractDimensionDataChunk
       if (value == null || Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
         vector.putNull(vectorOffset++);
       } else {
-        vector.putBytes(vectorOffset++, value);
+        DataType dt = vector.getType();
+        if (dt instanceof StringType) {
+          vector.putBytes(vectorOffset++, value);
+        } else if (dt instanceof BooleanType) {
+          vector.putBoolean(vectorOffset++, ByteUtil.toBoolean(value));
+        } else if (dt instanceof ShortType) {
+          vector.putShort(vectorOffset++, ByteUtil.toShort(value, 0, value.length));
+        } else if (dt instanceof IntegerType) {
+          vector.putInt(vectorOffset++, ByteUtil.toInt(value, 0, value.length));
+        } else if (dt instanceof FloatType) {
+          vector.putFloat(vectorOffset++, ByteUtil.toFloat(value, 0));
+        } else if (dt instanceof DoubleType) {
+          vector.putDouble(vectorOffset++, ByteUtil.toDouble(value, 0));
+        } else if (dt instanceof LongType) {
+          vector.putLong(vectorOffset++, ByteUtil.toLong(value, 0, value.length));
+        } else if (dt instanceof DecimalType) {
+          vector.putDecimal(vectorOffset++,
+              Decimal.apply(ByteUtil.toBigDecimal(value, 0, value.length)),
+              DecimalType.MAX_PRECISION());
+        }
       }
     }
     return column + 1;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/Bits.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/Bits.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/Bits.java
index 7bd338d..044a77b 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/Bits.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/Bits.java
@@ -119,7 +119,8 @@ public class Bits implements Serializable {
   protected long[] get(long[] keys) {
     long[] words = new long[wsize];
     int ll = 0;
-    for (int i = lens.length - 1; i >= 0; i--) {
+    int minLength = Math.min(lens.length, keys.length);
+    for (int i = minLength - 1; i >= 0; i--) {
 
       long val = keys[i];
 
@@ -155,7 +156,8 @@ public class Bits implements Serializable {
   protected long[] get(int[] keys) {
     long[] words = new long[wsize];
     int ll = 0;
-    for (int i = lens.length - 1; i >= 0; i--) {
+    int minLength = Math.min(lens.length, keys.length);
+    for (int i = minLength - 1; i >= 0; i--) {
 
       long val = keys[i];
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 09ed368..3047b65 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -17,8 +17,11 @@
 package org.apache.carbondata.core.metadata.converter;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
@@ -172,6 +175,13 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
     thriftColumnSchema.setInvisible(wrapperColumnSchema.isInvisible());
     thriftColumnSchema.setColumnReferenceId(wrapperColumnSchema.getColumnReferenceId());
     thriftColumnSchema.setSchemaOrdinal(wrapperColumnSchema.getSchemaOrdinal());
+
+    if (wrapperColumnSchema.isSortColumn()) {
+      Map<String, String> properties = new HashMap<String, String>();
+      properties.put(CarbonCommonConstants.SORT_COLUMNS, "true");
+      thriftColumnSchema.setColumnProperties(properties);
+    }
+
     return thriftColumnSchema;
   }
 
@@ -360,6 +370,14 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
     wrapperColumnSchema.setInvisible(externalColumnSchema.isInvisible());
     wrapperColumnSchema.setColumnReferenceId(externalColumnSchema.getColumnReferenceId());
     wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
+    wrapperColumnSchema.setSortColumn(false);
+    Map<String, String> properties = externalColumnSchema.getColumnProperties();
+    if (properties != null) {
+      String sortColumns = properties.get(CarbonCommonConstants.SORT_COLUMNS);
+      if (sortColumns != null) {
+        wrapperColumnSchema.setSortColumn(true);
+      }
+    }
     return wrapperColumnSchema;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index dbc8836..1271b6a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -116,6 +116,16 @@ public class CarbonTable implements Serializable {
    */
   private int blockSize;
 
+  /**
+   * the number of columns in SORT_COLUMNS
+   */
+  private int numberOfSortColumns;
+
+  /**
+   * the number of no dictionary columns in SORT_COLUMNS
+   */
+  private int numberOfNoDictSortColumns;
+
   public CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -238,10 +248,16 @@ public class CarbonTable implements Serializable {
           i = dimensionOrdinal - 1;
           complexTypeOrdinal = assignComplexOrdinal(complexDimension, complexTypeOrdinal);
         } else {
+          if (!columnSchema.isInvisible() && columnSchema.isSortColumn()) {
+            this.numberOfSortColumns++;
+          }
           if (!columnSchema.getEncodingList().contains(Encoding.DICTIONARY)) {
             CarbonDimension dimension =
                     new CarbonDimension(columnSchema, dimensionOrdinal++,
                             columnSchema.getSchemaOrdinal(), -1, -1, -1);
+            if (!columnSchema.isInvisible() && columnSchema.isSortColumn()) {
+              this.numberOfNoDictSortColumns++;
+            }
             allDimensions.add(dimension);
             primitiveDimensions.add(dimension);
           } else if (columnSchema.getEncodingList().contains(Encoding.DICTIONARY)
@@ -639,4 +655,16 @@ public class CarbonTable implements Serializable {
     }
     tableMeasuresMap.put(tableName, visibleMeasures);
   }
+
+  public boolean isSortByColumns() {
+    return numberOfSortColumns > 0;
+  }
+
+  public int getNumberOfSortColumns() {
+    return numberOfSortColumns;
+  }
+
+  public int getNumberOfNoDictSortColumns() {
+    return numberOfNoDictSortColumns;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index 7aa0900..08969fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -109,6 +109,8 @@ public class ColumnSchema implements Serializable {
    */
   private boolean invisible = false;
 
+  private boolean isSortColumn = false;
+
   /**
    * @return the columnName
    */
@@ -403,4 +405,12 @@ public class ColumnSchema implements Serializable {
   public void setSchemaOrdinal(int schemaOrdinal) {
     this.schemaOrdinal = schemaOrdinal;
   }
+
+  public boolean isSortColumn() {
+    return isSortColumn;
+  }
+
+  public void setSortColumn(boolean sortColumn) {
+    isSortColumn = sortColumn;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index e5e4b3c..a7c93bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -88,7 +88,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     int rowCounter = 0;
     int[] surrogateResult;
-    String[] noDictionaryKeys;
+    byte[][] noDictionaryKeys;
     byte[][] complexTypeKeyArray;
     BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
         scannedResult.getDeleteDeltaDataCache();
@@ -96,7 +96,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
       Object[] row = new Object[queryDimensions.length + queryMeasures.length];
       if (isDimensionExists) {
         surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
-        noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray();
+        noDictionaryKeys = scannedResult.getNoDictionaryKeyArray();
         complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
         dictionaryColumnIndex = 0;
         noDictionaryColumnIndex = 0;
@@ -120,7 +120,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
   }
 
   protected void fillDimensionData(AbstractScannedResult scannedResult, int[] surrogateResult,
-      String[] noDictionaryKeys, byte[][] complexTypeKeyArray,
+      byte[][] noDictionaryKeys, byte[][] complexTypeKeyArray,
       Map<Integer, GenericQueryType> comlexDimensionInfoMap, Object[] row, int i) {
     if (!dictionaryEncodingArray[i]) {
       if (implictColumnArray[i]) {
@@ -134,9 +134,9 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
               DataTypeUtil.getDataBasedOnDataType(scannedResult.getBlockletId(), DataType.STRING);
         }
       } else {
-        row[order[i]] = DataTypeUtil
-            .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
-                queryDimensions[i].getDimension().getDataType());
+        row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
+            noDictionaryKeys[noDictionaryColumnIndex++],
+            queryDimensions[i].getDimension().getDataType());
       }
     } else if (directDictionaryEncodingArray[i]) {
       if (directDictionaryGenerators[i] != null) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 71045ff..7c07923 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -48,7 +48,7 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     int rowCounter = 0;
     int[] surrogateResult;
-    String[] noDictionaryKeys;
+    byte[][] noDictionaryKeys;
     byte[][] complexTypeKeyArray;
     BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
         scannedResult.getDeleteDeltaDataCache();
@@ -58,7 +58,7 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
       Object[] row = new Object[queryDimensions.length + queryMeasures.length];
       if (isDimensionExists) {
         surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
-        noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray();
+        noDictionaryKeys = scannedResult.getNoDictionaryKeyArray();
         complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
         dictionaryColumnIndex = 0;
         noDictionaryColumnIndex = 0;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 770aa7e..bb26ea5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -355,10 +355,23 @@ public final class FilterUtil {
    * @return DimColumnFilterInfo
    */
   public static DimColumnFilterInfo getNoDictionaryValKeyMemberForFilter(
-      List<String> evaluateResultListFinal, boolean isIncludeFilter) {
+      List<String> evaluateResultListFinal, boolean isIncludeFilter, DataType dataType)
+      throws FilterUnsupportedException {
     List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
-    for (String result : evaluateResultListFinal) {
-      filterValuesList.add(result.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+    String result = null;
+    try {
+      int length = evaluateResultListFinal.size();
+      for (int i = 0; i < length; i++) {
+        result = evaluateResultListFinal.get(i);
+        if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(result)) {
+          filterValuesList.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+          continue;
+        }
+        filterValuesList.add(
+              DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(result, dataType));
+      }
+    } catch (Throwable ex) {
+      throw new FilterUnsupportedException("Unsupported Filter condition: " + result, ex);
     }
 
     Comparator<byte[]> filterNoDictValueComaparator = new Comparator<byte[]>() {
@@ -879,8 +892,10 @@ public final class FilterUtil {
       }
       int keyOrdinalOfDimensionFromCurrentBlock = dimensionFromCurrentBlock.getKeyOrdinal();
       for (DimColumnFilterInfo info : values) {
-        if (startKey[keyOrdinalOfDimensionFromCurrentBlock] < info.getFilterList().get(0)) {
-          startKey[keyOrdinalOfDimensionFromCurrentBlock] = info.getFilterList().get(0);
+        if (keyOrdinalOfDimensionFromCurrentBlock < startKey.length) {
+          if (startKey[keyOrdinalOfDimensionFromCurrentBlock] < info.getFilterList().get(0)) {
+            startKey[keyOrdinalOfDimensionFromCurrentBlock] = info.getFilterList().get(0);
+          }
         }
       }
       long[] newStartKey = new long[startKey.length];
@@ -944,10 +959,12 @@ public final class FilterUtil {
       }
       int keyOrdinalOfDimensionFromCurrentBlock = dimensionFromCurrentBlock.getKeyOrdinal();
       for (DimColumnFilterInfo info : values) {
-        if (endKey[keyOrdinalOfDimensionFromCurrentBlock] > info.getFilterList()
-            .get(info.getFilterList().size() - 1)) {
-          endKey[keyOrdinalOfDimensionFromCurrentBlock] =
-              info.getFilterList().get(info.getFilterList().size() - 1);
+        if (keyOrdinalOfDimensionFromCurrentBlock < endKey.length) {
+          if (endKey[keyOrdinalOfDimensionFromCurrentBlock] > info.getFilterList()
+              .get(info.getFilterList().size() - 1)) {
+            endKey[keyOrdinalOfDimensionFromCurrentBlock] =
+                info.getFilterList().get(info.getFilterList().size() - 1);
+          }
         }
       }
       long[] newEndKey = new long[endKey.length];
@@ -1040,12 +1057,10 @@ public final class FilterUtil {
    */
   public static IndexKey prepareDefaultEndIndexKey(SegmentProperties segmentProperties)
       throws KeyGenException {
-    long[] dictionarySurrogateKey =
-        new long[segmentProperties.getDimensions().size() - segmentProperties
-            .getNumberOfNoDictionaryDimension()];
+    long[] dictionarySurrogateKey = new long[segmentProperties.getNumberOfDictSortColumns()];
     int index = 0;
     int[] dimColumnsCardinality = segmentProperties.getDimColumnsCardinality();
-    for (int i = 0; i < dimColumnsCardinality.length; i++) {
+    for (int i = 0; i < dictionarySurrogateKey.length; i++) {
       dictionarySurrogateKey[index++] = dimColumnsCardinality[i];
     }
     IndexKey endIndexKey;
@@ -1057,27 +1072,28 @@ public final class FilterUtil {
   }
 
   public static byte[] getNoDictionaryDefaultEndKey(SegmentProperties segmentProperties) {
+
+    int numberOfNoDictionaryDimension = segmentProperties.getNumberOfNoDictSortColumns();
     // in case of non filter query when no dictionary columns are present we
     // need to set the default end key, as for non filter query
     // we need to get the last
     // block of the btree so we are setting the max byte value in the end key
     ByteBuffer noDictionaryEndKeyBuffer = ByteBuffer.allocate(
-        (segmentProperties.getNumberOfNoDictionaryDimension()
-            * CarbonCommonConstants.SHORT_SIZE_IN_BYTE) + segmentProperties
-            .getNumberOfNoDictionaryDimension());
+        (numberOfNoDictionaryDimension * CarbonCommonConstants.SHORT_SIZE_IN_BYTE)
+            + numberOfNoDictionaryDimension);
     // end key structure will be
     //<Offset of first No Dictionary key in 2 Bytes><Offset of second No Dictionary key in 2 Bytes>
     //<Offset of n No Dictionary key in 2 Bytes><first no dictionary column value>
     // <second no dictionary column value> <N no dictionary column value>
     //example if we have 2 no dictionary column
     //<[0,4,0,5,127,127]>
-    short startPoint = (short) (segmentProperties.getNumberOfNoDictionaryDimension()
-        * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
-    for (int i = 0; i < segmentProperties.getNumberOfNoDictionaryDimension(); i++) {
+    short startPoint =
+        (short) (numberOfNoDictionaryDimension * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    for (int i = 0; i < numberOfNoDictionaryDimension; i++) {
       noDictionaryEndKeyBuffer.putShort((startPoint));
       startPoint++;
     }
-    for (int i = 0; i < segmentProperties.getNumberOfNoDictionaryDimension(); i++) {
+    for (int i = 0; i < numberOfNoDictionaryDimension; i++) {
       noDictionaryEndKeyBuffer.put((byte) 127);
     }
     return noDictionaryEndKeyBuffer.array();
@@ -1094,9 +1110,7 @@ public final class FilterUtil {
   public static IndexKey prepareDefaultStartIndexKey(SegmentProperties segmentProperties)
       throws KeyGenException {
     IndexKey startIndexKey;
-    long[] dictionarySurrogateKey =
-        new long[segmentProperties.getDimensions().size() - segmentProperties
-            .getNumberOfNoDictionaryDimension()];
+    long[] dictionarySurrogateKey = new long[segmentProperties.getNumberOfDictSortColumns()];
     byte[] dictionaryStartMdkey =
         segmentProperties.getDimensionKeyGenerator().generateKey(dictionarySurrogateKey);
     byte[] noDictionaryStartKeyArray = getNoDictionaryDefaultStartKey(segmentProperties);
@@ -1106,26 +1120,27 @@ public final class FilterUtil {
   }
 
   public static byte[] getNoDictionaryDefaultStartKey(SegmentProperties segmentProperties) {
+
+    int numberOfNoDictionaryDimension = segmentProperties.getNumberOfNoDictSortColumns();
     // in case of non filter query when no dictionary columns are present we
     // need to set the default start key, as for non filter query we need to get the first
     // block of the btree so we are setting the least byte value in the start key
     ByteBuffer noDictionaryStartKeyBuffer = ByteBuffer.allocate(
-        (segmentProperties.getNumberOfNoDictionaryDimension()
-            * CarbonCommonConstants.SHORT_SIZE_IN_BYTE) + segmentProperties
-            .getNumberOfNoDictionaryDimension());
+        (numberOfNoDictionaryDimension * CarbonCommonConstants.SHORT_SIZE_IN_BYTE)
+            + numberOfNoDictionaryDimension);
     // end key structure will be
     //<Offset of first No Dictionary key in 2 Bytes><Offset of second No Dictionary key in 2 Bytes>
     //<Offset of n No Dictionary key in 2 Bytes><first no dictionary column value>
     // <second no dictionary column value> <N no dictionary column value>
     //example if we have 2 no dictionary column
     //<[0,4,0,5,0,0]>
-    short startPoint = (short) (segmentProperties.getNumberOfNoDictionaryDimension()
-        * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
-    for (int i = 0; i < segmentProperties.getNumberOfNoDictionaryDimension(); i++) {
+    short startPoint =
+        (short) (numberOfNoDictionaryDimension * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    for (int i = 0; i < numberOfNoDictionaryDimension; i++) {
       noDictionaryStartKeyBuffer.putShort((startPoint));
       startPoint++;
     }
-    for (int i = 0; i < segmentProperties.getNumberOfNoDictionaryDimension(); i++) {
+    for (int i = 0; i < numberOfNoDictionaryDimension; i++) {
       noDictionaryStartKeyBuffer.put((byte) 0);
     }
     return noDictionaryStartKeyBuffer.array();
@@ -1181,8 +1196,8 @@ public final class FilterUtil {
       FilterResolverIntf filterResolver, List<IndexKey> listOfStartEndKeys) {
     IndexKey searchStartKey = null;
     IndexKey searchEndKey = null;
-    long[] startKey = new long[segmentProperties.getDimensionKeyGenerator().getDimCount()];
-    long[] endKey = new long[segmentProperties.getDimensionKeyGenerator().getDimCount()];
+    long[] startKey = new long[segmentProperties.getNumberOfDictSortColumns()];
+    long[] endKey = new long[segmentProperties.getNumberOfDictSortColumns()];
     List<byte[]> listOfStartKeyByteArray =
         new ArrayList<byte[]>(segmentProperties.getNumberOfNoDictionaryDimension());
     List<byte[]> listOfEndKeyByteArray =
@@ -1234,6 +1249,17 @@ public final class FilterUtil {
     pruneStartAndEndKeys(setOfStartKeyByteArray, listOfStartKeyByteArray);
     pruneStartAndEndKeys(setOfEndKeyByteArray, listOfEndKeyByteArray);
 
+    if (segmentProperties.getNumberOfNoDictSortColumns() == 0) {
+      listOfStartKeyByteArray = new ArrayList<byte[]>();
+      listOfEndKeyByteArray = new ArrayList<byte[]>();
+    } else if (segmentProperties.getNumberOfNoDictSortColumns() < listOfStartKeyByteArray
+        .size()) {
+      while (segmentProperties.getNumberOfNoDictSortColumns() < listOfStartKeyByteArray.size()) {
+        listOfStartKeyByteArray.remove(listOfStartKeyByteArray.size() - 1);
+        listOfEndKeyByteArray.remove(listOfEndKeyByteArray.size() - 1);
+      }
+    }
+
     searchStartKey = FilterUtil
         .createIndexKeyFromResolvedFilterVal(startKey, segmentProperties.getDimensionKeyGenerator(),
             FilterUtil.getKeyWithIndexesAndValues(listOfStartKeyByteArray));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/NoDictionaryTypeVisitor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/NoDictionaryTypeVisitor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/NoDictionaryTypeVisitor.java
index 351d2c0..efedd93 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/NoDictionaryTypeVisitor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/NoDictionaryTypeVisitor.java
@@ -67,7 +67,8 @@ public class NoDictionaryTypeVisitor implements ResolvedFilterInfoVisitorIntf {
       throw new FilterUnsupportedException(e);
     }
     resolvedFilterObject = FilterUtil
-        .getNoDictionaryValKeyMemberForFilter(evaluateResultListFinal, metadata.isIncludeFilter());
+        .getNoDictionaryValKeyMemberForFilter(evaluateResultListFinal, metadata.isIncludeFilter(),
+            metadata.getColumnExpression().getDataType());
     visitableObj.setFilterValues(resolvedFilterObject);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/RangeNoDictionaryTypeVisitor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/RangeNoDictionaryTypeVisitor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/RangeNoDictionaryTypeVisitor.java
index 8d0a8b4..d703ed0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/RangeNoDictionaryTypeVisitor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/RangeNoDictionaryTypeVisitor.java
@@ -72,7 +72,8 @@ public class RangeNoDictionaryTypeVisitor extends NoDictionaryTypeVisitor
       throw new FilterUnsupportedException(e);
     }
     resolvedFilterObject = FilterUtil
-        .getNoDictionaryValKeyMemberForFilter(evaluateResultListFinal, metadata.isIncludeFilter());
+        .getNoDictionaryValKeyMemberForFilter(evaluateResultListFinal, metadata.isIncludeFilter(),
+            metadata.getColumnExpression().getDataType());
     visitableObj.setFilterValues(resolvedFilterObject);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index 4952e07..82a0b45 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -17,10 +17,15 @@
 
 package org.apache.carbondata.core.scan.result.vector;
 
+import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 
 public interface CarbonColumnVector {
 
+  void putBoolean(int rowId, boolean value);
+
+  void putFloat(int rowId, float value);
+
   void putShort(int rowId, short value);
 
   void putShorts(int rowId, int count, short value);
@@ -59,4 +64,6 @@ public interface CarbonColumnVector {
 
   void reset();
 
+  DataType getType();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 48a081d..52f2f31 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.BlockInfo;
@@ -195,6 +196,12 @@ public abstract class AbstractDataFileFooterConverter {
     wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id());
     wrapperColumnSchema.setScale(externalColumnSchema.getScale());
     wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
+    Map<String, String> properties = externalColumnSchema.getColumnProperties();
+    if (properties != null) {
+      if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
+        wrapperColumnSchema.setSortColumn(true);
+      }
+    }
     return wrapperColumnSchema;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 9b2c2ed..84a8634 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -17,7 +17,11 @@
 
 package org.apache.carbondata.core.util;
 
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
@@ -27,7 +31,13 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
  */
 public final class ByteUtil {
 
-  private static final int SIZEOF_LONG = 8;
+  public static final int SIZEOF_LONG = 8;
+
+  public static final int SIZEOF_INT = 4;
+
+  public static final int SIZEOF_SHORT = 2;
+
+  public static final String UTF8_CSN = StandardCharsets.UTF_8.name();
 
   private ByteUtil() {
 
@@ -363,4 +373,328 @@ public final class ByteUtil {
 
   }
 
+  /**
+   * Stirng => byte[]
+   *
+   * @param s
+   * @return
+   */
+  public static byte[] toBytes(String s) {
+    try {
+      return s.getBytes(UTF8_CSN);
+    } catch (UnsupportedEncodingException e) {
+      // should never happen!
+      throw new IllegalArgumentException("UTF8 decoding is not supported", e);
+    }
+  }
+
+  /**
+   * byte[] => String
+   *
+   * @param b
+   * @param off
+   * @param len
+   * @return
+   */
+  public static String toString(final byte[] b, int off, int len) {
+    if (b == null) {
+      return null;
+    }
+    if (len == 0) {
+      return "";
+    }
+    try {
+      return new String(b, off, len, UTF8_CSN);
+    } catch (UnsupportedEncodingException e) {
+      // should never happen!
+      throw new IllegalArgumentException("UTF8 encoding is not supported", e);
+    }
+  }
+
+  /**
+   * boolean => byte[]
+   *
+   * @param b
+   * @return
+   */
+  public static byte[] toBytes(final boolean b) {
+    return new byte[] { b ? (byte) -1 : (byte) 0 };
+  }
+
+  /**
+   * byte[] => boolean
+   *
+   * @param b
+   * @return
+   */
+  public static boolean toBoolean(final byte[] b) {
+    if (b.length != 1) {
+      throw new IllegalArgumentException("Array has wrong size: " + b.length);
+    }
+    return b[0] != (byte) 0;
+  }
+
+  /**
+   * short => byte[]
+   *
+   * @param val
+   * @return
+   */
+  public static byte[] toBytes(short val) {
+    byte[] b = new byte[SIZEOF_SHORT];
+    b[1] = (byte) val;
+    val >>= 8;
+    b[0] = (byte) val;
+    return b;
+  }
+
+  /**
+   * byte[] => short
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return
+   */
+  public static short toShort(byte[] bytes, int offset, final int length) {
+    if (length != SIZEOF_SHORT || offset + length > bytes.length) {
+      throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_SHORT);
+    }
+    if (CarbonUnsafe.unsafe != null) {
+      if (CarbonUnsafe.ISLITTLEENDIAN) {
+        return Short.reverseBytes(
+            CarbonUnsafe.unsafe.getShort(bytes, offset + CarbonUnsafe.BYTE_ARRAY_OFFSET));
+      } else {
+        return CarbonUnsafe.unsafe.getShort(bytes, offset + CarbonUnsafe.BYTE_ARRAY_OFFSET);
+      }
+    } else {
+      short n = 0;
+      n ^= bytes[offset] & 0xFF;
+      n <<= 8;
+      n ^= bytes[offset + 1] & 0xFF;
+      return n;
+    }
+  }
+
+  /**
+   * int => byte[]
+   *
+   * @param val
+   * @return
+   */
+  public static byte[] toBytes(int val) {
+    byte[] b = new byte[4];
+    for (int i = 3; i > 0; i--) {
+      b[i] = (byte) val;
+      val >>>= 8;
+    }
+    b[0] = (byte) val;
+    return b;
+  }
+
+  /**
+   * byte[] => int
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return
+   */
+  public static int toInt(byte[] bytes, int offset, final int length) {
+    if (length != SIZEOF_INT || offset + length > bytes.length) {
+      throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_INT);
+    }
+    if (CarbonUnsafe.unsafe != null) {
+      if (CarbonUnsafe.ISLITTLEENDIAN) {
+        return Integer.reverseBytes(
+            CarbonUnsafe.unsafe.getInt(bytes, offset + CarbonUnsafe.BYTE_ARRAY_OFFSET));
+      } else {
+        return CarbonUnsafe.unsafe.getInt(bytes, offset + CarbonUnsafe.BYTE_ARRAY_OFFSET);
+      }
+    } else {
+      int n = 0;
+      for (int i = offset; i < (offset + length); i++) {
+        n <<= 8;
+        n ^= bytes[i] & 0xFF;
+      }
+      return n;
+    }
+  }
+
+  /**
+   * float => byte[]
+   *
+   * @param f
+   * @return
+   */
+  public static byte[] toBytes(final float f) {
+    // Encode it as int
+    return toBytes(Float.floatToRawIntBits(f));
+  }
+
+  /**
+   * byte[] => float
+   *
+   * @param bytes
+   * @param offset
+   * @return
+   */
+  public static float toFloat(byte[] bytes, int offset) {
+    return Float.intBitsToFloat(toInt(bytes, offset, SIZEOF_INT));
+  }
+
+  /**
+   * long => byte[]
+   *
+   * @param val
+   * @return
+   */
+  public static byte[] toBytes(long val) {
+    byte[] b = new byte[8];
+    for (int i = 7; i > 0; i--) {
+      b[i] = (byte) val;
+      val >>>= 8;
+    }
+    b[0] = (byte) val;
+    return b;
+  }
+
+  /**
+   * byte[] => long
+   */
+  public static long toLong(byte[] bytes, int offset, final int length) {
+    if (length != SIZEOF_LONG || offset + length > bytes.length) {
+      throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_LONG);
+    }
+    if (CarbonUnsafe.unsafe != null) {
+      if (CarbonUnsafe.ISLITTLEENDIAN) {
+        return Long.reverseBytes(
+            CarbonUnsafe.unsafe.getLong(bytes, offset + CarbonUnsafe.BYTE_ARRAY_OFFSET));
+      } else {
+        return CarbonUnsafe.unsafe.getLong(bytes, offset + CarbonUnsafe.BYTE_ARRAY_OFFSET);
+      }
+    } else {
+      long l = 0;
+      for (int i = offset; i < offset + length; i++) {
+        l <<= 8;
+        l ^= bytes[i] & 0xFF;
+      }
+      return l;
+    }
+  }
+
+  /**
+   * doube => byte[]
+   *
+   * @param d
+   * @return
+   */
+  public static byte[] toBytes(final double d) {
+    // Encode it as a long
+    return toBytes(Double.doubleToRawLongBits(d));
+  }
+
+  /**
+   * byte[] => double
+   *
+   * @param bytes
+   * @param offset
+   * @return
+   */
+  public static double toDouble(final byte[] bytes, final int offset) {
+    return Double.longBitsToDouble(toLong(bytes, offset, SIZEOF_LONG));
+  }
+
+  /**
+   * BigDecimal => byte[]
+   *
+   * @param val
+   * @return
+   */
+  public static byte[] toBytes(BigDecimal val) {
+    byte[] valueBytes = val.unscaledValue().toByteArray();
+    byte[] result = new byte[valueBytes.length + SIZEOF_INT];
+    int offset = putInt(result, 0, val.scale());
+    putBytes(result, offset, valueBytes, 0, valueBytes.length);
+    return result;
+  }
+
+  /**
+   * byte[] => BigDecimal
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return
+   */
+  public static BigDecimal toBigDecimal(byte[] bytes, int offset, final int length) {
+    if (bytes == null || length < SIZEOF_INT + 1 || (offset + length > bytes.length)) {
+      return null;
+    }
+
+    int scale = toInt(bytes, offset, bytes.length);
+    byte[] tcBytes = new byte[length - SIZEOF_INT];
+    System.arraycopy(bytes, offset + SIZEOF_INT, tcBytes, 0, length - SIZEOF_INT);
+    return new BigDecimal(new BigInteger(tcBytes), scale);
+  }
+
+  private static IllegalArgumentException explainWrongLengthOrOffset(final byte[] bytes,
+      final int offset, final int length, final int expectedLength) {
+    String reason;
+    if (length != expectedLength) {
+      reason = "Wrong length: " + length + ", expected " + expectedLength;
+    } else {
+      reason = "offset (" + offset + ") + length (" + length + ") exceed the"
+          + " capacity of the array: " + bytes.length;
+    }
+    return new IllegalArgumentException(reason);
+  }
+
+  /**
+   * Put an int value out to the specified byte array position.
+   *
+   * @param bytes  the byte array
+   * @param offset position in the array
+   * @param val    int to write out
+   * @return incremented offset
+   * @throws IllegalArgumentException if the byte array given doesn't have
+   *                                  enough room at the offset specified.
+   */
+  public static int putInt(byte[] bytes, int offset, int val) {
+    if (bytes.length - offset < SIZEOF_INT) {
+      throw new IllegalArgumentException(
+          "Not enough room to put an int at" + " offset " + offset + " in a " + bytes.length
+              + " byte array");
+    }
+    if (CarbonUnsafe.unsafe != null) {
+      if (CarbonUnsafe.ISLITTLEENDIAN) {
+        val = Integer.reverseBytes(val);
+      }
+      CarbonUnsafe.unsafe.putInt(bytes, offset + CarbonUnsafe.BYTE_ARRAY_OFFSET, val);
+      return offset + ByteUtil.SIZEOF_INT;
+    } else {
+      for (int i = offset + 3; i > offset; i--) {
+        bytes[i] = (byte) val;
+        val >>>= 8;
+      }
+      bytes[offset] = (byte) val;
+      return offset + SIZEOF_INT;
+    }
+  }
+
+  /**
+   * Put bytes at the specified byte array position.
+   *
+   * @param tgtBytes  the byte array
+   * @param tgtOffset position in the array
+   * @param srcBytes  array to write out
+   * @param srcOffset source offset
+   * @param srcLength source length
+   * @return incremented offset
+   */
+  public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes, int srcOffset,
+      int srcLength) {
+    System.arraycopy(srcBytes, srcOffset, tgtBytes, tgtOffset, srcLength);
+    return tgtOffset + srcLength;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 10411b0..12ec058 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -320,6 +320,76 @@ public final class DataTypeUtil {
 
   }
 
+  public static byte[] getBytesBasedOnDataTypeForNoDictionaryColumn(String dimensionValue,
+      DataType actualDataType) throws Throwable {
+    switch (actualDataType) {
+      case STRING:
+        return ByteUtil.toBytes(dimensionValue);
+      case BOOLEAN:
+        return ByteUtil.toBytes(Boolean.parseBoolean(dimensionValue));
+      case SHORT:
+        return ByteUtil.toBytes(Short.parseShort(dimensionValue));
+      case INT:
+        return ByteUtil.toBytes(Integer.parseInt(dimensionValue));
+      case FLOAT:
+        return ByteUtil.toBytes(Float.parseFloat(dimensionValue));
+      case LONG:
+        return ByteUtil.toBytes(Long.parseLong(dimensionValue));
+      case DOUBLE:
+        return ByteUtil.toBytes(Double.parseDouble(dimensionValue));
+      case DECIMAL:
+        return ByteUtil.toBytes(new BigDecimal(dimensionValue));
+      default:
+        return ByteUtil.toBytes(dimensionValue);
+    }
+  }
+
+
+  /**
+   * Below method will be used to convert the data passed to its actual data
+   * type
+   *
+   * @param dataInBytes    data
+   * @param actualDataType actual data type
+   * @return actual data after conversion
+   */
+  public static Object getDataBasedOnDataTypeForNoDictionaryColumn(byte[] dataInBytes,
+      DataType actualDataType) {
+    if (null == dataInBytes || Arrays
+        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, dataInBytes)) {
+      return null;
+    }
+    try {
+      switch (actualDataType) {
+        case STRING:
+          return UTF8String.fromBytes(dataInBytes);
+        case BOOLEAN:
+          return ByteUtil.toBoolean(dataInBytes);
+        case SHORT:
+          return ByteUtil.toShort(dataInBytes, 0, dataInBytes.length);
+        case INT:
+          return ByteUtil.toInt(dataInBytes, 0, dataInBytes.length);
+        case FLOAT:
+          return ByteUtil.toFloat(dataInBytes, 0);
+        case LONG:
+          return ByteUtil.toLong(dataInBytes, 0, dataInBytes.length);
+        case DOUBLE:
+          return ByteUtil.toDouble(dataInBytes, 0);
+        case DECIMAL:
+          return ByteUtil.toBigDecimal(dataInBytes, 0, dataInBytes.length);
+        default:
+          return ByteUtil.toString(dataInBytes, 0, dataInBytes.length);
+      }
+    } catch (Throwable ex) {
+      String data = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+      LOGGER.error("Cannot convert" + data + " to " + actualDataType.getName() + " type value" + ex
+          .getMessage());
+      LOGGER.error("Problem while converting data type" + data);
+      return null;
+    }
+  }
+
+
   /**
    * Below method will be used to convert the data passed to its actual data
    * type

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
index 06063a4..406fbb7 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 
 import mockit.Mock;
@@ -362,7 +363,7 @@ public class FilterUtilTest extends AbstractDictionaryCacheTest {
     assertFalse(result);
   }
 
-  @Test public void testGetNoDictionaryValKeyMemberForFilter() {
+  @Test public void testGetNoDictionaryValKeyMemberForFilter() throws FilterUnsupportedException {
     boolean isIncludeFilter = true;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         new AbsoluteTableIdentifier(this.carbonStorePath, carbonTableIdentifier);
@@ -370,7 +371,9 @@ public class FilterUtilTest extends AbstractDictionaryCacheTest {
     List<String> evaluateResultListFinal = new ArrayList<>();
     evaluateResultListFinal.add("test1");
     evaluateResultListFinal.add("test2");
-    assertTrue(FilterUtil.getNoDictionaryValKeyMemberForFilter(evaluateResultListFinal, isIncludeFilter) instanceof DimColumnFilterInfo);
+    assertTrue(FilterUtil
+        .getNoDictionaryValKeyMemberForFilter(evaluateResultListFinal, isIncludeFilter,
+            DataType.STRING) instanceof DimColumnFilterInfo);
   }
 
   @Test public void testPrepareDefaultStartIndexKey() throws KeyGenException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
index 43953d0..cfdbd37 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
@@ -87,6 +87,9 @@ public class DictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
    * threshold is reached
    */
   @Override public void close() {
+    if (dictionaries == null) {
+      return;
+    }
     for (int i = 0; i < dictionaries.length; i++) {
       CarbonUtil.clearDictionaryCache(dictionaries[i]);
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 51ce2c5..aa6a2b4 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -202,6 +202,7 @@ public class StoreCreator {
     date.setColumnUniqueId(UUID.randomUUID().toString());
     date.setDimensionColumn(true);
     date.setColumnGroup(2);
+    date.setSortColumn(true);
     columnSchemas.add(date);
 
     ColumnSchema country = new ColumnSchema();
@@ -212,6 +213,7 @@ public class StoreCreator {
     country.setColumnUniqueId(UUID.randomUUID().toString());
     country.setDimensionColumn(true);
     country.setColumnGroup(3);
+    country.setSortColumn(true);
     columnSchemas.add(country);
 
     ColumnSchema name = new ColumnSchema();
@@ -222,6 +224,7 @@ public class StoreCreator {
     name.setColumnUniqueId(UUID.randomUUID().toString());
     name.setDimensionColumn(true);
     name.setColumnGroup(4);
+    name.setSortColumn(true);
     columnSchemas.add(name);
 
     ColumnSchema phonetype = new ColumnSchema();
@@ -232,6 +235,7 @@ public class StoreCreator {
     phonetype.setColumnUniqueId(UUID.randomUUID().toString());
     phonetype.setDimensionColumn(true);
     phonetype.setColumnGroup(5);
+    phonetype.setSortColumn(true);
     columnSchemas.add(phonetype);
 
     ColumnSchema serialname = new ColumnSchema();
@@ -242,6 +246,7 @@ public class StoreCreator {
     serialname.setColumnUniqueId(UUID.randomUUID().toString());
     serialname.setDimensionColumn(true);
     serialname.setColumnGroup(6);
+    serialname.setSortColumn(true);
     columnSchemas.add(serialname);
 
     ColumnSchema salary = new ColumnSchema();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
new file mode 100644
index 0000000..3b28453
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.sortcolumns
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestSortColumns extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    dropTable
+
+    sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+  }
+
+  test("create table with no dictionary sort_columns") {
+    sql("CREATE TABLE sorttable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='empno')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    checkAnswer(sql("select empno from sorttable1"), sql("select empno from sorttable1 order by empno"))
+  }
+
+  test("create table with dictionary sort_columns") {
+    sql("CREATE TABLE sorttable2 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='empname')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    checkAnswer(sql("select empname from sorttable2"),sql("select empname from origintable1"))
+  }
+
+  test("create table with direct-dictioanry sort_columns") {
+    sql("CREATE TABLE sorttable3 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='doj')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable3 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    checkAnswer(sql("select doj from sorttable3"), sql("select doj from sorttable3 order by doj"))
+  }
+
+  test("create table with multi-sort_columns and data loading with offheap safe") {
+    try {
+      setLoadingProperties("true", "false", "false")
+      sql("CREATE TABLE sorttable4_offheap_safe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_offheap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_offheap_safe"), sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with offheap and unsafe sort") {
+    try {
+      setLoadingProperties("true", "true", "false")
+      sql(
+        "CREATE TABLE sorttable4_offheap_unsafe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_offheap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_offheap_unsafe"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with offheap and inmemory sort") {
+    try {
+      setLoadingProperties("true", "false", "true")
+      sql(
+        "CREATE TABLE sorttable4_offheap_inmemory (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_offheap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_offheap_inmemory"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with heap") {
+    try {
+      setLoadingProperties("false", "false", "false")
+      sql(
+        "CREATE TABLE sorttable4_heap_safe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_heap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_heap_safe"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with heap and unsafe sort") {
+    try {
+      setLoadingProperties("false", "true", "false")
+      sql(
+        "CREATE TABLE sorttable4_heap_unsafe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_heap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_heap_unsafe"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with heap and inmemory sort") {
+    try {
+      setLoadingProperties("false", "false", "true")
+      sql(
+        "CREATE TABLE sorttable4_heap_inmemory (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_heap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_heap_inmemory"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("compaction on sort_columns table") {
+    sql("CREATE TABLE origintable2 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql("alter table origintable2 compact 'minor'")
+
+    sql("CREATE TABLE sorttable5 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='empno')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql("alter table sorttable5 compact 'minor'")
+
+    checkAnswer(sql("select empno from sorttable5"), sql("select empno from origintable2 order by empno"))
+  }
+
+  test("filter on sort_columns include no-dictionary, direct-dictionary and dictioanry") {
+    sql("CREATE TABLE sorttable6 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, doj, empname')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable6 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    // no dictionary
+    checkAnswer(sql("select * from sorttable6 where workgroupcategory = 1"), sql("select * from origintable1 where workgroupcategory = 1 order by doj"))
+    // direct dictionary
+    checkAnswer(sql("select * from sorttable6 where doj = '2007-01-17 00:00:00'"), sql("select * from origintable1 where doj = '2007-01-17 00:00:00'"))
+    // dictionary
+    checkAnswer(sql("select * from sorttable6 where empname = 'madhan'"), sql("select * from origintable1 where empname = 'madhan'"))
+  }
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists origintable1")
+    sql("drop table if exists origintable2")
+    sql("drop table if exists sorttable1")
+    sql("drop table if exists sorttable2")
+    sql("drop table if exists sorttable3")
+    sql("drop table if exists sorttable4_offheap_safe")
+    sql("drop table if exists sorttable4_offheap_unsafe")
+    sql("drop table if exists sorttable4_offheap_inmemory")
+    sql("drop table if exists sorttable4_heap_safe")
+    sql("drop table if exists sorttable4_heap_unsafe")
+    sql("drop table if exists sorttable4_heap_inmemory")
+    sql("drop table if exists sorttable5")
+    sql("drop table if exists sorttable6")
+  }
+
+  def setLoadingProperties(offheap: String, unsafe: String, useBatch: String): Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, offheap)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, unsafe)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, useBatch)
+  }
+
+  def defaultLoadingProperties = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, CarbonCommonConstants.ENABLE_OFFHEAP_SORT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
index 29aa7e7..b5b0f1b 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
@@ -86,10 +86,11 @@ public class RowResultMerger {
     this.rawResultIteratorList = iteratorList;
     // create the List of RawResultIterator.
 
+    this.segprop = segProp;
+
     recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(),
         new RowResultMerger.CarbonMdkeyComparator());
 
-    this.segprop = segProp;
     this.tempStoreLocation = tempStoreLocation;
 
     this.factStoreLocation = loadModel.getStorePath();
@@ -317,6 +318,19 @@ public class RowResultMerger {
    * Comparator class for comparing 2 raw row result.
    */
   private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
+    int[] columnValueSizes = segprop.getEachDimColumnValueSize();
+    public CarbonMdkeyComparator() {
+      initSortColumns();
+    }
+
+    private void initSortColumns() {
+      int numberOfSortColumns = segprop.getNumberOfSortColumns();
+      if (numberOfSortColumns != columnValueSizes.length) {
+        int[] sortColumnValueSizes = new int[numberOfSortColumns];
+        System.arraycopy(columnValueSizes, 0, sortColumnValueSizes, 0, numberOfSortColumns);
+        this.columnValueSizes = sortColumnValueSizes;
+      }
+    }
 
     @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
 
@@ -334,7 +348,6 @@ public class RowResultMerger {
       ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
       ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
       int compareResult = 0;
-      int[] columnValueSizes = segprop.getEachDimColumnValueSize();
       int dictionaryKeyOffset = 0;
       byte[] dimCols1 = key1.getDictionaryKey();
       byte[] dimCols2 = key2.getDictionaryKey();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 8120942..521054c 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -239,21 +239,15 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     fields.zipWithIndex.foreach { x =>
       x._1.schemaOrdinal = x._2
     }
-    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
+    val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields(
       fields, tableProperties)
     if (dims.isEmpty && !isAlterFlow) {
-      throw new MalformedCarbonCommandException(s"Table ${
-        dbName.getOrElse(
-          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-      }.$tableName"
-                                                +
-                                                " can not be created without key columns. Please " +
-                                                "use DICTIONARY_INCLUDE or " +
-                                                "DICTIONARY_EXCLUDE to set at least one key " +
-                                                "column " +
-                                                "if all specified columns are numeric types")
+      throw new MalformedCarbonCommandException(
+        s"Table ${dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME)}.$tableName " +
+        "can not be created without key columns. Please use DICTIONARY_INCLUDE or " +
+        "DICTIONARY_EXCLUDE to set at least one key column " +
+        "if all specified columns are numeric types")
     }
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
 
     // column properties
     val colProps = extractColumnProperties(fields, tableProperties)
@@ -275,6 +269,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       tableProperties,
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
       msrs.map(f => normalizeType(f)),
+      Option(sortKeyDims),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
@@ -484,14 +479,45 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
    * @param tableProperties
    * @return
    */
-  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
-      tableProperties: Map[String, String]):
-  (Seq[Field], Seq[String]) = {
+  protected def extractDimAndMsrFields(fields: Seq[Field],
+      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = {
     var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+    var msrFields: Seq[Field] = Seq[Field]()
     var dictExcludeCols: Array[String] = Array[String]()
     var noDictionaryDims: Seq[String] = Seq[String]()
     var dictIncludeCols: Seq[String] = Seq[String]()
 
+    // All columns in sortkey should be there in create table cols
+    val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
+    var sortKeyDimsTmp: Seq[String] = Seq[String]()
+    if (sortKeyOption.isDefined) {
+      var sortKey = sortKeyOption.get.split(',').map(_.trim)
+      sortKey.foreach { column =>
+        if (!fields.exists(x => x.column.equalsIgnoreCase(column))) {
+          val errormsg = "sort_columns: " + column +
+            " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        } else {
+          val dataType = fields.find(x =>
+            x.column.equalsIgnoreCase(column)).get.dataType.get
+          if (isComplexDimDictionaryExclude(dataType)) {
+            val errormsg = "sort_columns is unsupported for complex datatype column: " + column
+            throw new MalformedCarbonCommandException(errormsg)
+          }
+        }
+      }
+
+      sortKey.foreach { dimension =>
+        if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase(_))) {
+          fields.foreach { field =>
+            if (field.column.equalsIgnoreCase(dimension)) {
+              sortKeyDimsTmp :+= field.column
+            }
+          }
+        }
+      }
+    }
+
     // All excluded cols should be there in create table cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       dictExcludeCols =
@@ -530,7 +556,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       }
     }
 
-    // include cols should contain exclude cols
+    // include cols should not contain exclude cols
     dictExcludeCols.foreach { dicExcludeCol =>
       if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
         val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
@@ -552,10 +578,30 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         dimFields += field
       } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
         dimFields += field
+      } else if (sortKeyDimsTmp.exists(x => x.equalsIgnoreCase(field.column))) {
+        noDictionaryDims :+= field.column
+        dimFields += field
+      } else {
+        msrFields :+= field
       }
     }
 
-    (dimFields.toSeq, noDictionaryDims)
+    var sortKeyDims = sortKeyDimsTmp
+    if (sortKeyOption.isEmpty) {
+      // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS.
+      dimFields.foreach { field =>
+        if (!isComplexDimDictionaryExclude(field.dataType.get)) {
+          sortKeyDims :+= field.column
+        }
+      }
+    }
+    if (sortKeyDims.isEmpty) {
+      // no SORT_COLUMNS
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, "")
+    } else {
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKeyDims.mkString(","))
+    }
+    (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims)
   }
 
   /**
@@ -602,44 +648,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   }
 
   /**
-   * Extract the Measure Cols fields. By default all non string cols will be measures.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractMsrColsFromFields(fields: Seq[Field],
-      tableProperties: Map[String, String]): Seq[Field] = {
-    var msrFields: Seq[Field] = Seq[Field]()
-    var dictIncludedCols: Array[String] = Array[String]()
-    var dictExcludedCols: Array[String] = Array[String]()
-
-    // get all included cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
-      dictIncludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
-    }
-
-    // get all excluded cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
-      dictExcludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
-    }
-
-    // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
-    fields.foreach(field => {
-      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
-        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
-            !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
-          msrFields :+= field
-        }
-      }
-    })
-
-    msrFields
-  }
-
-  /**
    * Extract the DbName and table name.
    *
    * @param tableNameParts