You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/02 14:13:07 UTC

[43/50] [abbrv] incubator-carbondata git commit: add SORT_COLUMNS options

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
new file mode 100644
index 0000000..88b0421
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.sortcolumns
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestSortColumns extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    dropTable
+
+    sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+  }
+
+  test("create table with no dictionary sort_columns") {
+    sql("CREATE TABLE sorttable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='empno')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    checkAnswer(sql("select empno from sorttable1"), sql("select empno from sorttable1 order by empno"))
+  }
+
+  test("create table with dictionary sort_columns") {
+    sql("CREATE TABLE sorttable2 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='empname')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    checkAnswer(sql("select empname from sorttable2"),sql("select empname from origintable1"))
+  }
+
+  test("create table with direct-dictioanry sort_columns") {
+    sql("CREATE TABLE sorttable3 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='doj')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable3 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    checkAnswer(sql("select doj from sorttable3"), sql("select doj from sorttable3 order by doj"))
+  }
+
+  test("create table with multi-sort_columns and data loading with offheap safe") {
+    try {
+      setLoadingProperties("true", "false", "false")
+      sql("CREATE TABLE sorttable4_offheap_safe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_offheap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_offheap_safe"), sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with offheap and unsafe sort") {
+    try {
+      setLoadingProperties("true", "true", "false")
+      sql(
+        "CREATE TABLE sorttable4_offheap_unsafe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_offheap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_offheap_unsafe"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with offheap and inmemory sort") {
+    try {
+      setLoadingProperties("true", "false", "true")
+      sql(
+        "CREATE TABLE sorttable4_offheap_inmemory (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_offheap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_offheap_inmemory"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with heap") {
+    try {
+      setLoadingProperties("false", "false", "false")
+      sql(
+        "CREATE TABLE sorttable4_heap_safe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_heap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_heap_safe"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with heap and unsafe sort") {
+    try {
+      setLoadingProperties("false", "true", "false")
+      sql(
+        "CREATE TABLE sorttable4_heap_unsafe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_heap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_heap_unsafe"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("create table with multi-sort_columns and data loading with heap and inmemory sort") {
+    try {
+      setLoadingProperties("false", "false", "true")
+      sql(
+        "CREATE TABLE sorttable4_heap_inmemory (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, empname')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable4_heap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select workgroupcategory, empname from sorttable4_heap_inmemory"),
+        sql("select workgroupcategory, empname from origintable1 order by workgroupcategory"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("compaction on sort_columns table") {
+    sql("CREATE TABLE origintable2 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql("alter table origintable2 compact 'minor'")
+
+    sql("CREATE TABLE sorttable5 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='empno')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    sql("alter table sorttable5 compact 'minor'")
+
+    checkAnswer(sql("select empno from sorttable5"), sql("select empno from origintable2 order by empno"))
+  }
+
+  test("filter on sort_columns include no-dictionary, direct-dictionary and dictioanry") {
+    sql("CREATE TABLE sorttable6 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='workgroupcategory, doj, empname')")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable6 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+    // no dictionary
+    checkAnswer(sql("select * from sorttable6 where workgroupcategory = 1"), sql("select * from origintable1 where workgroupcategory = 1 order by doj"))
+    // direct dictionary
+    checkAnswer(sql("select * from sorttable6 where doj = '2007-01-17 00:00:00'"), sql("select * from origintable1 where doj = '2007-01-17 00:00:00'"))
+    // dictionary
+    checkAnswer(sql("select * from sorttable6 where empname = 'madhan'"), sql("select * from origintable1 where empname = 'madhan'"))
+  }
+
+  test("unsorted table creation, query data loading with heap and safe sort config") {
+    try {
+      setLoadingProperties("false", "false", "false")
+      sql("CREATE TABLE unsortedtable_heap_safe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_heap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select * from unsortedtable_heap_safe where empno = 11"), sql("select * from origintable1 where empno = 11"))
+      checkAnswer(sql("select * from unsortedtable_heap_safe order by empno"), sql("select * from origintable1 order by empno"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("unsorted table creation, query and data loading with heap and unsafe sort config") {
+    try {
+      setLoadingProperties("false", "true", "false")
+      sql("CREATE TABLE unsortedtable_heap_unsafe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_heap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select * from unsortedtable_heap_unsafe where empno = 11"), sql("select * from origintable1 where empno = 11"))
+      checkAnswer(sql("select * from unsortedtable_heap_unsafe order by empno"), sql("select * from origintable1 order by empno"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("unsorted table creation, query and loading with heap and inmemory sort config") {
+    try {
+      setLoadingProperties("false", "false", "true")
+      sql("CREATE TABLE unsortedtable_heap_inmemory (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_heap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select * from unsortedtable_heap_inmemory where empno = 11"), sql("select * from origintable1 where empno = 11"))
+      checkAnswer(sql("select * from unsortedtable_heap_inmemory order by empno"), sql("select * from origintable1 order by empno"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("unsorted table creation, query and data loading with offheap and safe sort config") {
+    try {
+      setLoadingProperties("true", "false", "false")
+      sql("CREATE TABLE unsortedtable_offheap_safe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_offheap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select * from unsortedtable_offheap_safe where empno = 11"), sql("select * from origintable1 where empno = 11"))
+      checkAnswer(sql("select * from unsortedtable_offheap_safe order by empno"), sql("select * from origintable1 order by empno"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("unsorted table creation, query and data loading with offheap and unsafe sort config") {
+    try {
+      setLoadingProperties("true", "true", "false")
+      sql("CREATE TABLE unsortedtable_offheap_unsafe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_offheap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select * from unsortedtable_offheap_unsafe where empno = 11"), sql("select * from origintable1 where empno = 11"))
+      checkAnswer(sql("select * from unsortedtable_offheap_unsafe order by empno"), sql("select * from origintable1 order by empno"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+  test("unsorted table creation, query and data loading with offheap and inmemory sort config") {
+    try {
+      setLoadingProperties("true", "false", "true")
+      sql("CREATE TABLE unsortedtable_offheap_inmemory (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_offheap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+      checkAnswer(sql("select * from unsortedtable_offheap_inmemory where empno = 11"), sql("select * from origintable1 where empno = 11"))
+      checkAnswer(sql("select * from unsortedtable_offheap_inmemory order by empno"), sql("select * from origintable1 order by empno"))
+    } finally {
+      defaultLoadingProperties
+    }
+  }
+
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists origintable1")
+    sql("drop table if exists origintable2")
+    sql("drop table if exists sorttable1")
+    sql("drop table if exists sorttable2")
+    sql("drop table if exists sorttable3")
+    sql("drop table if exists sorttable4_offheap_safe")
+    sql("drop table if exists sorttable4_offheap_unsafe")
+    sql("drop table if exists sorttable4_offheap_inmemory")
+    sql("drop table if exists sorttable4_heap_safe")
+    sql("drop table if exists sorttable4_heap_unsafe")
+    sql("drop table if exists sorttable4_heap_inmemory")
+    sql("drop table if exists sorttable5")
+    sql("drop table if exists sorttable6")
+    sql("drop table if exists unsortedtable_offheap_safe")
+    sql("drop table if exists unsortedtable_offheap_unsafe")
+    sql("drop table if exists unsortedtable_offheap_inmemory")
+    sql("drop table if exists unsortedtable_heap_safe")
+    sql("drop table if exists unsortedtable_heap_unsafe")
+    sql("drop table if exists unsortedtable_heap_inmemory")
+  }
+
+  def setLoadingProperties(offheap: String, unsafe: String, useBatch: String): Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, offheap)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, unsafe)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, useBatch)
+  }
+
+  def defaultLoadingProperties = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, CarbonCommonConstants.ENABLE_OFFHEAP_SORT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index dad5716..23d1b58 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataType
-import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -240,21 +240,15 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     fields.zipWithIndex.foreach { x =>
       x._1.schemaOrdinal = x._2
     }
-    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
+    val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields(
       fields, tableProperties)
     if (dims.isEmpty && !isAlterFlow) {
-      throw new MalformedCarbonCommandException(s"Table ${
-        dbName.getOrElse(
-          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-      }.$tableName"
-                                                +
-                                                " can not be created without key columns. Please " +
-                                                "use DICTIONARY_INCLUDE or " +
-                                                "DICTIONARY_EXCLUDE to set at least one key " +
-                                                "column " +
-                                                "if all specified columns are numeric types")
+      throw new MalformedCarbonCommandException(
+        s"Table ${dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME)}.$tableName " +
+        "can not be created without key columns. Please use DICTIONARY_INCLUDE or " +
+        "DICTIONARY_EXCLUDE to set at least one key column " +
+        "if all specified columns are numeric types")
     }
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
 
     // column properties
     val colProps = extractColumnProperties(fields, tableProperties)
@@ -276,6 +270,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       tableProperties,
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
       msrs.map(f => normalizeType(f)),
+      Option(sortKeyDims),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
@@ -485,14 +480,50 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
    * @param tableProperties
    * @return
    */
-  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
-      tableProperties: Map[String, String]):
-  (Seq[Field], Seq[String]) = {
+  protected def extractDimAndMsrFields(fields: Seq[Field],
+      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = {
     var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+    var msrFields: Seq[Field] = Seq[Field]()
     var dictExcludeCols: Array[String] = Array[String]()
     var noDictionaryDims: Seq[String] = Seq[String]()
     var dictIncludeCols: Seq[String] = Seq[String]()
 
+    // All columns in sortkey should be there in create table cols
+    val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
+    var sortKeyDimsTmp: Seq[String] = Seq[String]()
+    val sortKeyString: String = if (sortKeyOption.isDefined) {
+      CarbonUtil.unquoteChar(sortKeyOption.get) trim
+    } else {
+      ""
+    }
+    if (!sortKeyString.isEmpty) {
+      val sortKey = sortKeyString.split(',').map(_.trim)
+      sortKey.foreach { column =>
+        if (!fields.exists(x => x.column.equalsIgnoreCase(column))) {
+          val errormsg = "sort_columns: " + column +
+            " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        } else {
+          val dataType = fields.find(x =>
+            x.column.equalsIgnoreCase(column)).get.dataType.get
+          if (isComplexDimDictionaryExclude(dataType)) {
+            val errormsg = "sort_columns is unsupported for complex datatype column: " + column
+            throw new MalformedCarbonCommandException(errormsg)
+          }
+        }
+      }
+
+      sortKey.foreach { dimension =>
+        if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase(_))) {
+          fields.foreach { field =>
+            if (field.column.equalsIgnoreCase(dimension)) {
+              sortKeyDimsTmp :+= field.column
+            }
+          }
+        }
+      }
+    }
+
     // All excluded cols should be there in create table cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       dictExcludeCols =
@@ -531,7 +562,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       }
     }
 
-    // include cols should contain exclude cols
+    // include cols should not contain exclude cols
     dictExcludeCols.foreach { dicExcludeCol =>
       if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
         val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
@@ -553,10 +584,30 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         dimFields += field
       } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
         dimFields += field
+      } else if (sortKeyDimsTmp.exists(x => x.equalsIgnoreCase(field.column))) {
+        noDictionaryDims :+= field.column
+        dimFields += field
+      } else {
+        msrFields :+= field
       }
     }
 
-    (dimFields.toSeq, noDictionaryDims)
+    var sortKeyDims = sortKeyDimsTmp
+    if (sortKeyOption.isEmpty) {
+      // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS.
+      dimFields.foreach { field =>
+        if (!isComplexDimDictionaryExclude(field.dataType.get)) {
+          sortKeyDims :+= field.column
+        }
+      }
+    }
+    if (sortKeyDims.isEmpty) {
+      // no SORT_COLUMNS
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, "")
+    } else {
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKeyDims.mkString(","))
+    }
+    (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims)
   }
 
   /**
@@ -603,44 +654,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   }
 
   /**
-   * Extract the Measure Cols fields. By default all non string cols will be measures.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractMsrColsFromFields(fields: Seq[Field],
-      tableProperties: Map[String, String]): Seq[Field] = {
-    var msrFields: Seq[Field] = Seq[Field]()
-    var dictIncludedCols: Array[String] = Array[String]()
-    var dictExcludedCols: Array[String] = Array[String]()
-
-    // get all included cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
-      dictIncludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
-    }
-
-    // get all excluded cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
-      dictExcludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
-    }
-
-    // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
-    fields.foreach(field => {
-      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
-        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
-            !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
-          msrFields :+= field
-        }
-      }
-    })
-
-    msrFields
-  }
-
-  /**
    * Extract the DbName and table name.
    *
    * @param tableNameParts

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index e03cbe5..187512d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -54,6 +54,7 @@ case class TableModel(
     tableProperties: Map[String, String],
     dimCols: Seq[Field],
     msrCols: Seq[Field],
+    sortKeyDims: Option[Seq[String]],
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
     columnGroups: Seq[String],
@@ -348,6 +349,7 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setPrecision(precision)
     columnSchema.setScale(scale)
     columnSchema.setSchemaOrdinal(schemaOrdinal)
+    columnSchema.setSortColumn(false)
     // TODO: Need to fill RowGroupID, converted type
     // & Number of Children after DDL finalization
     columnSchema
@@ -358,7 +360,11 @@ class TableNewProcessor(cm: TableModel) {
     val LOGGER = LogServiceFactory.getLogService(TableNewProcessor.getClass.getName)
     var allColumns = Seq[ColumnSchema]()
     var index = 0
-    cm.dimCols.foreach(field => {
+    var measureCount = 0
+
+    // Sort columns should be at the begin of all columns
+    cm.sortKeyDims.get.foreach { keyDim =>
+      val field = cm.dimCols.find(keyDim equals _.column).get
       val encoders = new java.util.ArrayList[Encoding]()
       encoders.add(Encoding.DICTIONARY)
       val columnSchema: ColumnSchema = getColumnSchema(
@@ -372,11 +378,33 @@ class TableNewProcessor(cm: TableModel) {
         field.precision,
         field.scale,
         field.schemaOrdinal)
-      allColumns ++= Seq(columnSchema)
+      columnSchema.setSortColumn(true)
+      allColumns :+= columnSchema
       index = index + 1
-      if (field.children.isDefined && field.children.get != null) {
-        columnSchema.setNumberOfChild(field.children.get.size)
-        allColumns ++= getAllChildren(field.children)
+    }
+
+    cm.dimCols.foreach(field => {
+      val sortField = cm.sortKeyDims.get.find(field.column equals _)
+      if (sortField.isEmpty) {
+        val encoders = new java.util.ArrayList[Encoding]()
+        encoders.add(Encoding.DICTIONARY)
+        val columnSchema: ColumnSchema = getColumnSchema(
+          DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+          field.name.getOrElse(field.column),
+          index,
+          isCol = true,
+          encoders,
+          isDimensionCol = true,
+          -1,
+          field.precision,
+          field.scale,
+          field.schemaOrdinal)
+        allColumns :+= columnSchema
+        index = index + 1
+        if (field.children.isDefined && field.children.get != null) {
+          columnSchema.setNumberOfChild(field.children.get.size)
+          allColumns ++= getAllChildren(field.children)
+        }
       }
     })
 
@@ -393,10 +421,9 @@ class TableNewProcessor(cm: TableModel) {
         field.precision,
         field.scale,
         field.schemaOrdinal)
-      val measureCol = columnSchema
-
-      allColumns ++= Seq(measureCol)
+      allColumns :+= columnSchema
       index = index + 1
+      measureCount += 1
     })
 
     // Check if there is any duplicate measures or dimensions.
@@ -417,22 +444,6 @@ class TableNewProcessor(cm: TableModel) {
 
     updateColumnGroupsInFields(cm.columnGroups, allColumns)
 
-    var newOrderedDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    val complexDims = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    val measures = scala.collection.mutable.ListBuffer[ColumnSchema]()
-    for (column <- allColumns) {
-      if (highCardinalityDims.contains(column.getColumnName)) {
-        newOrderedDims += column
-      } else if (column.isComplex) {
-        complexDims += column
-      } else if (column.isDimensionColumn) {
-        newOrderedDims += column
-      } else {
-        measures += column
-      }
-
-    }
-
     // Setting the boolean value of useInvertedIndex in column schema
     val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq())
     for (column <- allColumns) {
@@ -447,7 +458,7 @@ class TableNewProcessor(cm: TableModel) {
     }
 
     // Adding dummy measure if no measure is provided
-    if (measures.size < 1) {
+    if (measureCount == 0) {
       val encoders = new java.util.ArrayList[Encoding]()
       val columnSchema: ColumnSchema = getColumnSchema(DataType.DOUBLE,
         CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
@@ -457,13 +468,10 @@ class TableNewProcessor(cm: TableModel) {
         false,
         -1, 0, 0, schemaOrdinal = -1)
       columnSchema.setInvisible(true)
-      val measureColumn = columnSchema
-      measures += measureColumn
-      allColumns = allColumns ++ measures
+      allColumns :+= columnSchema
     }
     val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
     columnValidator.validateColumns(allColumns)
-    newOrderedDims = newOrderedDims ++ complexDims ++ measures
 
     val tableInfo = new TableInfo()
     val tableSchema = new TableSchema()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
index 8588868..b8f0a7c 100644
--- a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala
@@ -32,23 +32,15 @@ private class TestCarbonSqlParserStub extends CarbonSqlParser {
 
   def updateColumnGroupsInFieldTest(fields: Seq[Field], tableProperties: Map[String, String]): Seq[String] = {
 
-     var (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
-      fields, tableProperties)
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
+     var (dims, msrs, noDictionaryDims, sortkey) = extractDimAndMsrFields(fields, tableProperties)
 
     updateColumnGroupsInField(tableProperties,
         noDictionaryDims, msrs, dims)
   }
 
-  def extractDimColsAndNoDictionaryFieldsTest(fields: Seq[Field], tableProperties: Map[String, String]): (Seq[Field],
-    Seq[String]) = {
-
-    extractDimColsAndNoDictionaryFields(fields, tableProperties)
-  }
-
-  def extractMsrColsFromFieldsTest(fields: Seq[Field], tableProperties: Map[String, String]): (Seq[Field]) = {
-
-    extractMsrColsFromFields(fields, tableProperties)
+  def extractDimAndMsrFieldsTest(fields: Seq[Field],
+      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = {
+    extractDimAndMsrFields(fields, tableProperties)
   }
 
 
@@ -199,7 +191,7 @@ class TestCarbonSqlParser extends QueryTest {
     val fields: Seq[Field] = loadAllFields
 
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub.extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
+    val (dimCols, _, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     // testing col
 
@@ -219,9 +211,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -242,9 +232,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -264,9 +252,8 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col4")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields,
+      tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 8)
@@ -287,9 +274,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col3", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -310,9 +295,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -333,9 +316,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -358,9 +339,7 @@ class TestCarbonSqlParser extends QueryTest {
     )
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 8)
@@ -382,9 +361,7 @@ class TestCarbonSqlParser extends QueryTest {
     val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE-> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col3")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val (dimCols, noDictionary) = stub
-      .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties)
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     //below dimension fields should be available in dimensions list
     assert(dimCols.size == 7)
@@ -402,10 +379,11 @@ class TestCarbonSqlParser extends QueryTest {
 
   // Testing the extracting of measures
   test("Test-extractMsrColsFromFields") {
-    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
+    val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2",
+      CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1")
     val fields: Seq[Field] = loadAllFields
     val stub = new TestCarbonSqlParserStub()
-    val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties)
+    val (_, msrCols, _, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties)
 
     // testing col
     assert(msrCols.lift(0).get.column.equalsIgnoreCase("col4"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index b848543..b4b462a 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -28,18 +28,25 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 
 public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<InternalRow> {
 
+  boolean[] isMeasure;
+
   @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
-    super.initialize(carbonColumns, absoluteTableIdentifier);
     //can initialize and generate schema here.
+    isMeasure = new boolean[carbonColumns.length];
+    dataTypes = new DataType[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      isMeasure[i] = !carbonColumns[i].isDimesion();
+      dataTypes[i] = carbonColumns[i].getDataType();
+    }
   }
 
   @Override public InternalRow readRow(Object[] data) {
-    for (int i = 0; i < dictionaries.length; i++) {
+    for (int i = 0; i < isMeasure.length; i++) {
       if (data[i] == null) {
         continue;
       }
-      if (dictionaries[i] == null) {
+      if (isMeasure[i]) {
         if (dataTypes[i].equals(DataType.INT)) {
           data[i] = ((Long)(data[i])).intValue();
         } else if (dataTypes[i].equals(DataType.SHORT)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index f94c0b2..ad5e01f 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.vectorreader;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 
 class ColumnarVectorWrapper implements CarbonColumnVector {
@@ -30,6 +31,14 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     this.columnVector = columnVector;
   }
 
+  @Override public void putBoolean(int rowId, boolean value) {
+    columnVector.putBoolean(rowId, value);
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    columnVector.putFloat(rowId, value);
+  }
+
   @Override public void putShort(int rowId, short value) {
     columnVector.putShort(rowId, value);
   }
@@ -110,4 +119,8 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   @Override public void reset() {
 //    columnVector.reset();
   }
+
+  @Override public DataType getType() {
+    return columnVector.dataType();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index f8bdcf8..9e14f16 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataType
-import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -51,18 +51,54 @@ object TableCreator {
     dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
   }
 
-  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
-                                                    tableProperties: Map[String, String]):
-  (Seq[Field], Seq[String]) = {
+  protected def extractDimAndMsrFields(fields: Seq[Field],
+      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = {
     var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+    var msrFields: Seq[Field] = Seq[Field]()
     var dictExcludeCols: Array[String] = Array[String]()
     var noDictionaryDims: Seq[String] = Seq[String]()
     var dictIncludeCols: Seq[String] = Seq[String]()
 
+    // All columns in sortkey should be there in create table cols
+    val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
+    var sortKeyDimsTmp: Seq[String] = Seq[String]()
+    val sortKeyString: String = if (sortKeyOption.isDefined) {
+      CarbonUtil.unquoteChar(sortKeyOption.get) trim
+    } else {
+      ""
+    }
+    if (!sortKeyString.isEmpty) {
+      val sortKey = sortKeyString.split(',').map(_.trim)
+      sortKey.foreach { column =>
+        if (!fields.exists(x => x.column.equalsIgnoreCase(column))) {
+          val errormsg = "sort_columns: " + column +
+            " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        } else {
+          val dataType = fields.find(x =>
+            x.column.equalsIgnoreCase(column)).get.dataType.get
+          if (isComplexDimDictionaryExclude(dataType)) {
+            val errormsg = "sort_columns is unsupported for complex datatype column: " + column
+            throw new MalformedCarbonCommandException(errormsg)
+          }
+        }
+      }
+
+      sortKey.foreach { dimension =>
+        if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase(_))) {
+          fields.foreach { field =>
+            if (field.column.equalsIgnoreCase(dimension)) {
+              sortKeyDimsTmp :+= field.column
+            }
+          }
+        }
+      }
+    }
+
     // All excluded cols should be there in create table cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       dictExcludeCols =
-        tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
       dictExcludeCols
         .foreach { dictExcludeCol =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
@@ -97,7 +133,7 @@ object TableCreator {
       }
     }
 
-    // include cols should contain exclude cols
+    // include cols should not contain exclude cols
     dictExcludeCols.foreach { dicExcludeCol =>
       if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
         val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
@@ -108,11 +144,10 @@ object TableCreator {
 
     // by default consider all String cols as dims and if any dictionary exclude is present then
     // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
-    fields.foreach(field => {
-
+    fields.foreach { field =>
       if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
-        if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.TIMESTAMP &&
-            DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.DATE) {
+        val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
+        if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE) {
           noDictionaryDims :+= field.column
         }
         dimFields += field
@@ -120,49 +155,30 @@ object TableCreator {
         dimFields += field
       } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
         dimFields += field
+      } else if (sortKeyDimsTmp.exists(x => x.equalsIgnoreCase(field.column))) {
+        noDictionaryDims :+= field.column
+        dimFields += field
+      } else {
+        msrFields :+= field
       }
     }
-    )
-
-    (dimFields.toSeq, noDictionaryDims)
-  }
-
-  /**
-   * Extract the Measure Cols fields. By default all non string cols will be measures.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractMsrColsFromFields(fields: Seq[Field],
-                                         tableProperties: Map[String, String]): Seq[Field] = {
-    var msrFields: Seq[Field] = Seq[Field]()
-    var dictIncludedCols: Array[String] = Array[String]()
-    var dictExcludedCols: Array[String] = Array[String]()
-
-    // get all included cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
-      dictIncludedCols =
-        tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(',').map(_.trim)
-    }
-
-    // get all excluded cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
-      dictExcludedCols =
-        tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
-    }
 
-    // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
-    fields.foreach(field => {
-      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
-        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
-          !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
-          msrFields :+= field
+    var sortKeyDims = sortKeyDimsTmp
+    if (sortKeyOption.isEmpty) {
+      // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS.
+      dimFields.foreach { field =>
+        if (!isComplexDimDictionaryExclude(field.dataType.get)) {
+          sortKeyDims :+= field.column
         }
       }
-    })
-
-    msrFields
+    }
+    if (sortKeyDims.isEmpty) {
+      // no SORT_COLUMNS
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, "")
+    } else {
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKeyDims.mkString(","))
+    }
+    (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims)
   }
 
   def getKey(parentColumnName: Option[String],
@@ -440,27 +456,24 @@ object TableCreator {
   }
 
   def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
-                        , tableName: String, fields: Seq[Field],
-                        partitionCols: Seq[PartitionerField],
-                        bucketFields: Option[BucketFields],
-                        tableProperties: Map[String, String]): TableModel
+      , tableName: String, fields: Seq[Field],
+      partitionCols: Seq[PartitionerField],
+      bucketFields: Option[BucketFields],
+      tableProperties: Map[String, String]): TableModel
   = {
 
-    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
+    fields.zipWithIndex.foreach { x =>
+      x._1.schemaOrdinal = x._2
+    }
+    val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields(
       fields, tableProperties)
     if (dims.isEmpty) {
-      throw new MalformedCarbonCommandException(s"Table ${
-        dbName.getOrElse(
-          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-      }.$tableName"
-        +
-        " can not be created without key columns. Please " +
-        "use DICTIONARY_INCLUDE or " +
-        "DICTIONARY_EXCLUDE to set at least one key " +
-        "column " +
+      throw new MalformedCarbonCommandException(
+        s"Table ${dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME)}.$tableName " +
+        "can not be created without key columns. Please use DICTIONARY_INCLUDE or " +
+        "DICTIONARY_EXCLUDE to set at least one key column " +
         "if all specified columns are numeric types")
     }
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
 
     // column properties
     val colProps = extractColumnProperties(fields, tableProperties)
@@ -474,18 +487,20 @@ object TableCreator {
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
 
-    TableModel(ifNotExistPresent,
+    TableModel(
+      ifNotExistPresent,
       dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
       dbName,
       tableName,
       tableProperties,
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
       msrs.map(f => normalizeType(f)),
+      Option(sortKeyDims),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
       Some(colProps),
-      bucketFields)
+      bucketFields: Option[BucketFields])
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index a76cc0a..a8f9520 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -359,10 +359,18 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   private void initializeFinalThreadMergerForMergeSort() {
     String sortTempFileLocation = tempStoreLocation + CarbonCommonConstants.FILE_SEPARATOR
         + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
+    boolean[] noDictionarySortColumnMapping = null;
+    if (noDictionaryColMapping.length == this.segmentProperties.getNumberOfSortColumns()) {
+      noDictionarySortColumnMapping = noDictionaryColMapping;
+    } else {
+      noDictionarySortColumnMapping = new boolean[this.segmentProperties.getNumberOfSortColumns()];
+      System.arraycopy(noDictionaryColMapping, 0,
+          noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length);
+    }
     finalMerger =
         new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, dimensionColumnCount,
             segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount,
-            aggType, noDictionaryColMapping);
+            aggType, noDictionaryColMapping, noDictionarySortColumnMapping);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index b7aa32c..ce85227 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -183,6 +183,19 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
    * Comparator class for comparing 2 raw row result.
    */
   private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
+    int[] columnValueSizes = segprop.getEachDimColumnValueSize();
+    public CarbonMdkeyComparator() {
+      initSortColumns();
+    }
+
+    private void initSortColumns() {
+      int numberOfSortColumns = segprop.getNumberOfSortColumns();
+      if (numberOfSortColumns != columnValueSizes.length) {
+        int[] sortColumnValueSizes = new int[numberOfSortColumns];
+        System.arraycopy(columnValueSizes, 0, sortColumnValueSizes, 0, numberOfSortColumns);
+        this.columnValueSizes = sortColumnValueSizes;
+      }
+    }
 
     @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
 
@@ -200,7 +213,6 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
       ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
       ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
       int compareResult = 0;
-      int[] columnValueSizes = segprop.getEachDimColumnValueSize();
       int dictionaryKeyOffset = 0;
       byte[] dimCols1 = key1.getDictionaryKey();
       byte[] dimCols2 = key2.getDictionaryKey();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 12be777..e1eb071 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -80,6 +80,10 @@ public class CarbonDataLoadConfiguration {
 
   private DictionaryCardinalityFinder cardinalityFinder;
 
+  private int numberOfSortColumns;
+
+  private int numberOfNoDictSortColumns;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -125,6 +129,26 @@ public class CarbonDataLoadConfiguration {
     return dimCount;
   }
 
+  public void setNumberOfSortColumns(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
+  }
+
+  public int getNumberOfSortColumns() {
+    return this.numberOfSortColumns;
+  }
+
+  public boolean isSortTable() {
+    return this.numberOfSortColumns > 0;
+  }
+
+  public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
+    this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+  }
+
+  public int getNumberOfNoDictSortColumns() {
+    return this.numberOfNoDictSortColumns;
+  }
+
   public int getComplexDimensionCount() {
     int dimCount = 0;
     for (int i = 0; i < dataFields.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 8865518..11e5db1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWith
 import org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
+import org.apache.carbondata.processing.newflow.steps.NoSortProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -76,9 +77,10 @@ public final class DataLoadProcessBuilder {
     // data types and configurations.
     AbstractDataLoadProcessorStep converterProcessorStep =
         new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
-    // 3. Sorts the data which are part of key (all dimensions except complex types)
-    AbstractDataLoadProcessorStep sortProcessorStep =
-        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 3. Sorts the data by SortColumn or not
+    AbstractDataLoadProcessorStep sortProcessorStep = configuration.isSortTable() ?
+        new SortProcessorStepImpl(configuration, converterProcessorStep) :
+        new NoSortProcessorStepImpl(configuration, converterProcessorStep);
     // 4. Writes the sorted data in carbondata format.
     AbstractDataLoadProcessorStep writerProcessorStep =
         new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
@@ -94,9 +96,10 @@ public final class DataLoadProcessBuilder {
     // data types and configurations.
     AbstractDataLoadProcessorStep converterProcessorStep =
         new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
-    // 3. Sorts the data which are part of key (all dimensions except complex types)
-    AbstractDataLoadProcessorStep sortProcessorStep =
-        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 3. Sorts the data by SortColumn or not
+    AbstractDataLoadProcessorStep sortProcessorStep = configuration.isSortTable() ?
+        new SortProcessorStepImpl(configuration, converterProcessorStep) :
+        new NoSortProcessorStepImpl(configuration, converterProcessorStep);
     // 4. Writes the sorted data in carbondata format.
     AbstractDataLoadProcessorStep writerProcessorStep =
         new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
@@ -112,9 +115,10 @@ public final class DataLoadProcessBuilder {
     // data types and configurations.
     AbstractDataLoadProcessorStep converterProcessorStep =
         new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep);
-    // 3. Sorts the data which are part of key (all dimensions except complex types)
-    AbstractDataLoadProcessorStep sortProcessorStep =
-        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 3. Sorts the data by SortColumn or not
+    AbstractDataLoadProcessorStep sortProcessorStep = configuration.isSortTable() ?
+        new SortProcessorStepImpl(configuration, converterProcessorStep) :
+        new NoSortProcessorStepImpl(configuration, converterProcessorStep);
     // 4. Writes the sorted data in carbondata format.
     AbstractDataLoadProcessorStep writerProcessorStep =
         new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
@@ -193,6 +197,8 @@ public final class DataLoadProcessBuilder {
     configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
     configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
     configuration.setPreFetch(loadModel.isPreFetch());
+    configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
+    configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
 
     return configuration;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
index 9e4b50d..3accb0b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -16,8 +16,6 @@
  */
 package org.apache.carbondata.processing.newflow.converter.impl;
 
-import java.nio.charset.Charset;
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -48,23 +46,24 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
     this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
-  @Override
-  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+  @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
     String dimensionValue = row.getString(index);
     if (dimensionValue == null || dimensionValue.equals(nullformat)) {
-      dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
-    }
-    if (dataType != DataType.STRING) {
-      if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, dataType)) {
-        if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
+      row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
+    } else {
+      try {
+        row.update(DataTypeUtil
+            .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType), index);
+      } catch (Throwable ex) {
+        if (dimensionValue.length() != 0 || isEmptyBadRecord) {
           logHolder.setReason(
               "The value " + " \"" + dimensionValue + "\"" + " with column name " + column
                   .getColName() + " and column data type " + dataType + " is not a valid "
                   + dataType + " type.");
+        } else {
+          row.update(new byte[0], index);
         }
       }
     }
-    row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
-        index);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 856b6ac..0e14660 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -83,7 +83,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
             sortParameters.getDimColCount(),
             sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
             sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
-            sortParameters.getNoDictionaryDimnesionColumn());
+            sortParameters.getNoDictionaryDimnesionColumn(),
+            sortParameters.getNoDictionarySortColumn());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index e5af1c6..19b1cfe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -143,7 +143,8 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
         new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
             sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
             sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
-            sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn());
+            sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(),
+            this.sortParameters.getNoDictionarySortColumn());
     return finalMerger;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index e682263..44f11f7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -34,6 +34,8 @@ public class UnsafeCarbonRowPage {
 
   private boolean[] noDictionaryDimensionMapping;
 
+  private boolean[] noDictionarySortColumnMapping;
+
   private int dimensionSize;
 
   private int measureSize;
@@ -52,9 +54,11 @@ public class UnsafeCarbonRowPage {
 
   private boolean saveToDisk;
 
-  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping, int dimensionSize,
-      int measureSize, char[] aggType, MemoryBlock memoryBlock, boolean saveToDisk) {
+  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, char[] aggType,
+      MemoryBlock memoryBlock, boolean saveToDisk) {
     this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+    this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
     this.dimensionSize = dimensionSize;
     this.measureSize = measureSize;
     this.aggType = aggType;
@@ -324,4 +328,7 @@ public class UnsafeCarbonRowPage {
     return noDictionaryDimensionMapping;
   }
 
+  public boolean[] getNoDictionarySortColumnMapping() {
+    return noDictionarySortColumnMapping;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index cb24968..40608fa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -113,6 +113,7 @@ public class UnsafeSortDataRows {
   public void initialize() throws CarbonSortKeyAndGroupByException {
     MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSize);
     this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+        parameters.getNoDictionarySortColumn(),
         parameters.getDimColCount() + parameters.getComplexDimColCount(),
         parameters.getMeasureColCount(), parameters.getAggType(), baseBlock,
         !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
@@ -178,6 +179,7 @@ public class UnsafeSortDataRows {
             MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
             boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
             rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+                parameters.getNoDictionarySortColumn(),
                 parameters.getDimColCount() + parameters.getComplexDimColCount(),
                 parameters.getMeasureColCount(), parameters.getAggType(), memoryBlock, saveToDisk);
             bytesAdded += rowPage.addRow(rowBatch[i]);
@@ -205,12 +207,12 @@ public class UnsafeSortDataRows {
     if (this.rowPage.getUsedSize() > 0) {
       TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
           new UnsafeIntSortDataFormat(rowPage));
-      if (parameters.getNoDictionaryCount() > 0) {
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
         timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
             new UnsafeRowComparator(rowPage));
       } else {
         timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), rowPage));
+            new UnsafeRowComparatorForNormalDIms(rowPage));
       }
       unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
     } else {
@@ -295,12 +297,13 @@ public class UnsafeSortDataRows {
         long startTime = System.currentTimeMillis();
         TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
             new UnsafeIntSortDataFormat(page));
-        if (parameters.getNoDictionaryCount() > 0) {
+        // if sort_columns is not none, sort by sort_columns
+        if (parameters.getNumberOfNoDictSortColumns() > 0) {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
               new UnsafeRowComparator(page));
         } else {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), page));
+              new UnsafeRowComparatorForNormalDIms(page));
         }
         if (rowPage.isSaveToDisk()) {
           // create a new file every time

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
index 8f048bd..1d60b11 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -27,14 +27,14 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonR
 public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
 
   /**
-   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
+   * mapping of dictionary and no dictionary of sort_columns.
    */
-  private boolean[] noDictionaryColMaping;
+  private boolean[] noDictionarySortColumnMaping;
 
   private Object baseObject;
 
   public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
-    this.noDictionaryColMaping = rowPage.getNoDictionaryDimensionMapping();
+    this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
     this.baseObject = rowPage.getDataBlock().getBaseObject();
   }
 
@@ -47,7 +47,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     long rowB = rowR.address;
     int sizeA = 0;
     int sizeB = 0;
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
       if (isNoDictionary) {
         short aShort1 = CarbonUnsafe.unsafe.getShort(baseObject, rowA + sizeA);
         byte[] byteArr1 = new byte[aShort1];
@@ -92,7 +92,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     long rowB = rowR.address;
     int sizeA = 0;
     int sizeB = 0;
-    for (boolean isNoDictionary : noDictionaryColMaping) {
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
       if (isNoDictionary) {
         short aShort1 = CarbonUnsafe.unsafe.getShort(baseObjectL, rowA + sizeA);
         byte[] byteArr1 = new byte[aShort1];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
index 7448aee..4fd245f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
@@ -27,11 +27,11 @@ public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbon
 
   private Object baseObject;
 
-  private int dimCount;
+  private int numberOfSortColumns;
 
-  public UnsafeRowComparatorForNormalDIms(int dimCount, UnsafeCarbonRowPage rowPage) {
+  public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
     this.baseObject = rowPage.getDataBlock().getBaseObject();
-    this.dimCount = dimCount;
+    this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
   }
 
   /**
@@ -43,7 +43,7 @@ public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbon
     long rowB = rowR.address;
     int sizeA = 0;
     int sizeB = 0;
-    for (int i = 0; i < dimCount; i++) {
+    for (int i = 0; i < numberOfSortColumns; i++) {
       int dimFieldA = CarbonUnsafe.unsafe.getInt(baseObject, rowA + sizeA);
       sizeA += 4;
       int dimFieldB = CarbonUnsafe.unsafe.getInt(baseObject, rowB + sizeB);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index ed9e0a6..397de63 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -45,13 +45,13 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
   private int columnSize;
 
   public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
-      boolean[] noDictMapping, int columnSize) {
+      boolean[] noDictSortColumnMapping, int columnSize) {
     this.actualSize = merger.getEntryCount();
     this.mergedAddresses = merger.getMergedAddresses();
     this.rowPageIndexes = merger.getRowPageIndexes();
     this.rowPages = merger.getUnsafeCarbonRowPages();
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new NewRowComparator(noDictMapping);
+    this.comparator = new NewRowComparator(noDictSortColumnMapping);
     this.columnSize = columnSize;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
index f491623..048f4f8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -41,11 +41,12 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
 
   private int columnSize;
 
-  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize) {
+  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
+      int numberOfSortColumns) {
     this.actualSize = rowPage.getBuffer().getActualSize();
     this.rowPage = rowPage;
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new NewRowComparator(rowPage.getNoDictionaryDimensionMapping());
+    this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
     this.columnSize = columnSize;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index de2b874..84aa029 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -153,7 +153,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     this.aggType = parameters.getAggType();
     this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
     this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
-    comparator = new NewRowComparator(isNoDictionaryDimensionColumn);
+    comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
     initialize();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7dafae7f/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 95a337a..c7d2ce4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -131,7 +131,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
 
         SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
             parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
-                .getMeasureColCount());
+                .getMeasureColCount(), parameters.getNumberOfSortColumns());
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -142,7 +142,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
       for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
 
         SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionaryDimnesionColumn(),
+            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(),
                 parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
                     .getMeasureColCount());