You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/09/18 13:38:16 UTC

[2/4] carbondata git commit: [CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
index 29e3060..29a4098 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data
@@ -61,8 +62,12 @@ public class BloomDataMapBuilder extends AbstractBloomDataMapWriter implements D
   }
 
   @Override
-  protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) {
-    return value;
+  protected byte[] convertNonDictionaryValue(int indexColIdx, Object value) {
+    // no dictionary measure columns will be of original data, so convert it to bytes
+    if (DataTypeUtil.isPrimitiveColumn(indexColumns.get(indexColIdx).getDataType())) {
+      return CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value);
+    }
+    return (byte[]) value;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index cad9787..61bd036 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.Predicate;
@@ -73,11 +74,14 @@ public class BloomDataMapWriter extends AbstractBloomDataMapWriter {
     }
   }
 
-  protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) {
+  protected byte[] convertNonDictionaryValue(int indexColIdx, Object value) {
     if (DataTypes.VARCHAR == indexColumns.get(indexColIdx).getDataType()) {
-      return DataConvertUtil.getRawBytesForVarchar(value);
+      return DataConvertUtil.getRawBytesForVarchar((byte[]) value);
+    } else if (DataTypeUtil.isPrimitiveColumn(indexColumns.get(indexColIdx).getDataType())) {
+      // get bytes for the original value of the no dictionary column
+      return CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value);
     } else {
-      return DataConvertUtil.getRawBytes(value);
+      return DataConvertUtil.getRawBytes((byte[]) value);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 7cd241a..5525941 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -245,7 +245,7 @@ public class StoreCreator {
     date.setEncodingList(encodings);
     date.setColumnUniqueId(UUID.randomUUID().toString());
     date.setDimensionColumn(true);
-    date.setColumnReferenceId(id.getColumnUniqueId());
+    date.setColumnReferenceId(date.getColumnUniqueId());
     date.setSchemaOrdinal(schemaOrdinal++);
     if (sortColumns.contains(date.getColumnName())) {
       date.setSortColumn(true);
@@ -263,7 +263,7 @@ public class StoreCreator {
     if (sortColumns.contains(country.getColumnName())) {
       country.setSortColumn(true);
     }
-    country.setColumnReferenceId(id.getColumnUniqueId());
+    country.setColumnReferenceId(country.getColumnUniqueId());
     columnSchemas.add(country);
 
     ColumnSchema name = new ColumnSchema();
@@ -276,7 +276,7 @@ public class StoreCreator {
     if (sortColumns.contains(name.getColumnName())) {
       name.setSortColumn(true);
     }
-    name.setColumnReferenceId(id.getColumnUniqueId());
+    name.setColumnReferenceId(name.getColumnUniqueId());
     columnSchemas.add(name);
 
     ColumnSchema phonetype = new ColumnSchema();
@@ -289,7 +289,7 @@ public class StoreCreator {
     if (sortColumns.contains(phonetype.getColumnName())) {
       phonetype.setSortColumn(true);
     }
-    phonetype.setColumnReferenceId(id.getColumnUniqueId());
+    phonetype.setColumnReferenceId(phonetype.getColumnUniqueId());
     columnSchemas.add(phonetype);
 
     ColumnSchema serialname = new ColumnSchema();
@@ -302,7 +302,7 @@ public class StoreCreator {
     if (sortColumns.contains(serialname.getColumnName())) {
       serialname.setSortColumn(true);
     }
-    serialname.setColumnReferenceId(id.getColumnUniqueId());
+    serialname.setColumnReferenceId(serialname.getColumnUniqueId());
     columnSchemas.add(serialname);
     ColumnSchema salary = new ColumnSchema();
     salary.setColumnName("salary");
@@ -310,11 +310,13 @@ public class StoreCreator {
     salary.setEncodingList(new ArrayList<Encoding>());
     salary.setColumnUniqueId(UUID.randomUUID().toString());
     salary.setDimensionColumn(false);
-    salary.setColumnReferenceId(id.getColumnUniqueId());
+    salary.setColumnReferenceId(salary.getColumnUniqueId());
     salary.setSchemaOrdinal(schemaOrdinal++);
     columnSchemas.add(salary);
 
-    tableSchema.setListOfColumns(columnSchemas);
+    // rearrange the column schema based on the sort order, if sort columns exists
+    List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas);
+    tableSchema.setListOfColumns(columnSchemas1);
     SchemaEvolution schemaEvol = new SchemaEvolution();
     schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
     tableSchema.setSchemaEvolution(schemaEvol);
@@ -352,6 +354,29 @@ public class StoreCreator {
     return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
   }
 
+  private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) {
+    List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size());
+    // add sort columns first
+    for (ColumnSchema columnSchema : columnSchemas) {
+      if (columnSchema.isSortColumn()) {
+        newColumnSchema.add(columnSchema);
+      }
+    }
+    // add other dimension columns
+    for (ColumnSchema columnSchema : columnSchemas) {
+      if (!columnSchema.isSortColumn() && columnSchema.isDimensionColumn()) {
+        newColumnSchema.add(columnSchema);
+      }
+    }
+    // add measure columns
+    for (ColumnSchema columnSchema : columnSchemas) {
+      if (!columnSchema.isDimensionColumn()) {
+        newColumnSchema.add(columnSchema);
+      }
+    }
+    return newColumnSchema;
+  }
+
   private void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
     BufferedReader reader = new BufferedReader(new InputStreamReader(
         new FileInputStream(factFilePath), "UTF-8"));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index be40b13..e810829 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -86,7 +86,9 @@ object DataLoadProcessBuilderOnSpark {
     val sortParameters = SortParameters.createSortParameters(configuration)
     val rowComparator: Comparator[Array[AnyRef]] =
       if (sortParameters.getNoDictionaryCount > 0) {
-        new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn)
+        new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn,
+          sortParameters.getNoDictionarySortColumn,
+          sortParameters.getNoDictDataType)
       } else {
         new NewRowComparatorForNormalDims(sortParameters.getDimColCount)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index c97732a..727191c 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -805,7 +805,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       model.setSegmentId("0")
       store.createCarbonStore(model)
       FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0/0"))
-      store.setSortColumns(new util.ArrayList[String](Seq("country,phonetype").asJava))
+      store.setSortColumns(new util.ArrayList[String](Seq("country","phonetype").asJava))
       model = store.createTableAndLoadModel(false)
       model.setSegmentId("1")
       store.createCarbonStore(model)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 0584fb1..1897c87 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -52,7 +52,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSch
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonUtil, TaskMetricsMap}
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil, TaskMetricsMap}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.datamap.bloom.DataConvertUtil
 import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
@@ -264,8 +264,17 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar
       rtn(i) = if (indexCol2IdxInDictArray.contains(col.getColName)) {
         surrogatKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer]
       } else if (indexCol2IdxInNoDictArray.contains(col.getColName)) {
-        data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex(
+        val bytes = data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex(
           indexCol2IdxInNoDictArray(col.getColName))
+        // no dictionary primitive columns are expected to be in original data while loading,
+        // so convert it to original data
+        if (DataTypeUtil.isPrimitiveColumn(col.getDataType)) {
+          val dataFromBytes = DataTypeUtil
+            .getDataBasedOnDataTypeForNoDictionaryColumn(bytes, col.getDataType)
+          dataFromBytes
+        } else {
+          bytes
+        }
       } else {
         // measures start from 1
         val value = data(1 + indexCol2IdxInMeasureArray(col.getColName))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index ad6823d..fcb6110 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
 
     assertResult(2)(result.length)
     assertResult("table_info1")(result(0).getString(0))
-    // 2087 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
-    assertResult(2216)(result(0).getLong(1))
+    // 2220 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
+    assertResult(2220)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2216)(result(1).getLong(1))
+    assertResult(2220)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 4d85296..616edeb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -339,6 +340,45 @@ public class CarbonDataLoadConfiguration {
     return type;
   }
 
+  /**
+   * Get the data types of the no dictionary and the complex dimensions of the table
+   *
+   * @return
+   */
+  public CarbonColumn[] getNoDictAndComplexDimensions() {
+    List<Integer> noDicOrCompIndexes = new ArrayList<>(dataFields.length);
+    int noDicCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (dataFields[i].getColumn().isDimension() && (
+          !(dataFields[i].getColumn().hasEncoding(Encoding.DICTIONARY)) || dataFields[i].getColumn()
+              .isComplex())) {
+        noDicOrCompIndexes.add(i);
+        noDicCount++;
+      }
+    }
+
+    CarbonColumn[] dims = new CarbonColumn[noDicCount];
+    for (int i = 0; i < dims.length; i++) {
+      dims[i] = dataFields[noDicOrCompIndexes.get(i)].getColumn();
+    }
+    return dims;
+  }
+
+  /**
+   * Get the sort column mapping of the table
+   *
+   * @return
+   */
+  public boolean[] getSortColumnMapping() {
+    boolean[] sortColumnMapping = new boolean[dataFields.length];
+    for (int i = 0; i < sortColumnMapping.length; i++) {
+      if (dataFields[i].getColumn().getColumnSchema().isSortColumn()) {
+        sortColumnMapping[i] = true;
+      }
+    }
+    return sortColumnMapping;
+  }
+
   public int[] calcDimensionLengths() {
     int[] dimLensWithComplex = getCardinalityFinder().getCardinality();
     if (!isSortTable()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 86f273d..7dfe95f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
@@ -111,6 +112,11 @@ public class FieldEncoderFactory {
             createComplexDataType(dataField, absoluteTableIdentifier,
                 client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index);
       } else {
+        // if the no dictionary column is a numeric column then treat is as measure col
+        // so that the adaptive encoding can be applied on it easily
+        if (DataTypeUtil.isPrimitiveColumn(dataField.getColumn().getDataType())) {
+          return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
+        }
         return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 9cbd607..20278e4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -20,8 +20,6 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
@@ -39,10 +37,6 @@ public class MeasureFieldConverterImpl implements FieldConverter {
 
   private int index;
 
-  private DataType dataType;
-
-  private CarbonMeasure measure;
-
   private String nullformat;
 
   private boolean isEmptyBadRecord;
@@ -51,8 +45,6 @@ public class MeasureFieldConverterImpl implements FieldConverter {
 
   public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
       boolean isEmptyBadRecord) {
-    this.dataType = dataField.getColumn().getDataType();
-    this.measure = (CarbonMeasure) dataField.getColumn();
     this.nullformat = nullformat;
     this.index = index;
     this.isEmptyBadRecord = isEmptyBadRecord;
@@ -73,20 +65,20 @@ public class MeasureFieldConverterImpl implements FieldConverter {
     Object output;
     boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(literalValue);
     if (literalValue == null || isNull) {
-      String message = logHolder.getColumnMessageMap().get(measure.getColName());
+      String message = logHolder.getColumnMessageMap().get(dataField.getColumn().getColName());
       if (null == message) {
-        message = CarbonDataProcessorUtil
-            .prepareFailureReason(measure.getColName(), measure.getDataType());
-        logHolder.getColumnMessageMap().put(measure.getColName(), message);
+        message = CarbonDataProcessorUtil.prepareFailureReason(dataField.getColumn().getColName(),
+            dataField.getColumn().getDataType());
+        logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), message);
       }
       return null;
     } else if (literalValue.length() == 0) {
       if (isEmptyBadRecord) {
-        String message = logHolder.getColumnMessageMap().get(measure.getColName());
+        String message = logHolder.getColumnMessageMap().get(dataField.getColumn().getColName());
         if (null == message) {
-          message = CarbonDataProcessorUtil
-              .prepareFailureReason(measure.getColName(), measure.getDataType());
-          logHolder.getColumnMessageMap().put(measure.getColName(), message);
+          message = CarbonDataProcessorUtil.prepareFailureReason(dataField.getColumn().getColName(),
+              dataField.getColumn().getDataType());
+          logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), message);
         }
         logHolder.setReason(message);
       }
@@ -96,18 +88,24 @@ public class MeasureFieldConverterImpl implements FieldConverter {
     } else {
       try {
         if (dataField.isUseActualData()) {
-          output =
-              DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure, true);
+          output = DataTypeUtil
+              .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(),
+                  dataField.getColumn().getColumnSchema().getScale(),
+                  dataField.getColumn().getColumnSchema().getPrecision(), true);
         } else {
-          output = DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure);
+          output = DataTypeUtil
+              .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(),
+                  dataField.getColumn().getColumnSchema().getScale(),
+                  dataField.getColumn().getColumnSchema().getPrecision());
         }
         return output;
       } catch (NumberFormatException e) {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug("Can not convert value to Numeric type value. Value considered as null.");
         }
-        logHolder.setReason(
-            CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
+        logHolder.setReason(CarbonDataProcessorUtil
+            .prepareFailureReason(dataField.getColumn().getColName(),
+                dataField.getColumn().getDataType()));
         return null;
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
index 64b64f5..3a325a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
@@ -21,7 +21,10 @@ import java.util.Comparator;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 /**
  * comparator for the converted row. The row has not been rearranged as 3-parted yet.
@@ -30,23 +33,38 @@ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 public class RawRowComparator implements Comparator<CarbonRow> {
   private int[] sortColumnIndices;
   private boolean[] isSortColumnNoDict;
+  private DataType[] noDicDataTypes;
 
-  public RawRowComparator(int[] sortColumnIndices, boolean[] isSortColumnNoDict) {
+  public RawRowComparator(int[] sortColumnIndices, boolean[] isSortColumnNoDict,
+      DataType[] noDicDataTypes) {
     this.sortColumnIndices = sortColumnIndices;
     this.isSortColumnNoDict = isSortColumnNoDict;
+    this.noDicDataTypes = noDicDataTypes;
   }
 
   @Override
   public int compare(CarbonRow o1, CarbonRow o2) {
     int diff = 0;
     int i = 0;
+    int noDicIdx = 0;
     for (int colIdx : sortColumnIndices) {
       if (isSortColumnNoDict[i]) {
-        byte[] colA = (byte[]) o1.getObject(colIdx);
-        byte[] colB = (byte[]) o2.getObject(colIdx);
-        diff = UnsafeComparer.INSTANCE.compareTo(colA, colB);
-        if (diff != 0) {
-          return diff;
+        if (DataTypeUtil.isPrimitiveColumn(noDicDataTypes[noDicIdx])) {
+          // for no dictionary numeric column get comparator based on the data type
+          SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator
+              .getComparator(noDicDataTypes[noDicIdx]);
+          int difference = comparator.compare(o1.getObject(colIdx), o2.getObject(colIdx));
+          if (difference != 0) {
+            return difference;
+          }
+          noDicIdx++;
+        } else {
+          byte[] colA = (byte[]) o1.getObject(colIdx);
+          byte[] colB = (byte[]) o2.getObject(colIdx);
+          diff = UnsafeComparer.INSTANCE.compareTo(colA, colB);
+          if (diff != 0) {
+            return diff;
+          }
         }
       } else {
         int colA = (int) o1.getObject(colIdx);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 1ad7879..844e45e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -23,7 +23,7 @@ package org.apache.carbondata.processing.loading.row;
  */
 public class IntermediateSortTempRow {
   private int[] dictSortDims;
-  private byte[][] noDictSortDims;
+  private Object[] noDictSortDims;
   /**
    * this will be used for intermediate merger when
    * no sort field and measure field will not be
@@ -35,14 +35,14 @@ public class IntermediateSortTempRow {
    */
   private Object[] measures;
 
-  public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
+  public IntermediateSortTempRow(int[] dictSortDims, Object[] noDictSortDims,
       byte[] noSortDimsAndMeasures) {
     this.dictSortDims = dictSortDims;
     this.noDictSortDims = noDictSortDims;
     this.noSortDimsAndMeasures = noSortDimsAndMeasures;
   }
 
-  public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
+  public IntermediateSortTempRow(int[] dictSortDims, Object[] noDictSortDims,
       Object[] measures) {
     this.dictSortDims = dictSortDims;
     this.noDictSortDims = noDictSortDims;
@@ -57,7 +57,7 @@ public class IntermediateSortTempRow {
     return measures;
   }
 
-  public byte[][] getNoDictSortDims() {
+  public Object[] getNoDictSortDims() {
     return noDictSortDims;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index 697f590..edfd317 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -23,10 +23,13 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonUnsafeUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
@@ -65,6 +68,14 @@ public class SortStepRowHandler implements Serializable {
 
   private DataType[] dataTypes;
 
+  private DataType[] noDictSortDataTypes;
+
+  private boolean[] noDictSortColMapping;
+
+  private DataType[] noDictNoSortDataTypes;
+
+  private boolean[] noDictNoSortColMapping;
+
   /**
    * constructor
    * @param tableFieldStat table field stat
@@ -85,6 +96,16 @@ public class SortStepRowHandler implements Serializable {
     this.complexDimIdx = tableFieldStat.getComplexDimIdx();
     this.measureIdx = tableFieldStat.getMeasureIdx();
     this.dataTypes = tableFieldStat.getMeasureDataType();
+    this.noDictSortDataTypes = tableFieldStat.getNoDictSortDataType();
+    noDictSortColMapping = new boolean[noDictSortDataTypes.length];
+    for (int i = 0; i < noDictSortDataTypes.length; i++) {
+      noDictSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictSortDataTypes[i]);
+    }
+    this.noDictNoSortDataTypes = tableFieldStat.getNoDictNoSortDataType();
+    noDictNoSortColMapping = new boolean[noDictNoSortDataTypes.length];
+    for (int i = 0; i < noDictNoSortDataTypes.length; i++) {
+      noDictNoSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictNoSortDataTypes[i]);
+    }
   }
 
   /**
@@ -108,8 +129,8 @@ public class SortStepRowHandler implements Serializable {
     try {
       int[] dictDims
           = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-      byte[][] nonDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt
-                                       + this.varcharDimCnt + this.complexDimCnt ][];
+      Object[] nonDictArray = new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt
+                                       + this.varcharDimCnt + this.complexDimCnt];
       Object[] measures = new Object[this.measureCnt];
 
       // convert dict & data
@@ -125,19 +146,19 @@ public class SortStepRowHandler implements Serializable {
       // convert no-dict & sort
       idxAcc = 0;
       for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.noDictSortDimIdx[idx]];
+        nonDictArray[idxAcc++] = row[this.noDictSortDimIdx[idx]];
       }
       // convert no-dict & no-sort
       for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
+        nonDictArray[idxAcc++] = row[this.noDictNoSortDimIdx[idx]];
       }
       // convert varchar dims
       for (int idx = 0; idx < this.varcharDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.varcharDimIdx[idx]];
+        nonDictArray[idxAcc++] = row[this.varcharDimIdx[idx]];
       }
       // convert complex dims
       for (int idx = 0; idx < this.complexDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.complexDimIdx[idx]];
+        nonDictArray[idxAcc++] = row[this.complexDimIdx[idx]];
       }
 
       // convert measure data
@@ -178,7 +199,7 @@ public class SortStepRowHandler implements Serializable {
   public IntermediateSortTempRow readWithoutNoSortFieldConvert(
       DataInputStream inputStream) throws IOException {
     int[] dictSortDims = new int[this.dictSortDimCnt];
-    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
+    Object[] noDictSortDims = new Object[this.noDictSortDimCnt];
 
     // read dict & sort dim data
     for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
@@ -187,10 +208,8 @@ public class SortStepRowHandler implements Serializable {
 
     // read no-dict & sort data
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      short len = inputStream.readShort();
-      byte[] bytes = new byte[len];
-      inputStream.readFully(bytes);
-      noDictSortDims[idx] = bytes;
+      // for no dict measure column get the original data
+      noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx);
     }
 
     // read no-dict dims & measures
@@ -213,9 +232,9 @@ public class SortStepRowHandler implements Serializable {
   public IntermediateSortTempRow readWithNoSortFieldConvert(
       DataInputStream inputStream) throws IOException {
     int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-    byte[][] noDictSortDims =
-        new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
-            + this.complexDimCnt][];
+    Object[] noDictSortDims =
+        new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+            + this.complexDimCnt];
 
     // read dict & sort dim data
     for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
@@ -224,10 +243,8 @@ public class SortStepRowHandler implements Serializable {
 
     // read no-dict & sort data
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      short len = inputStream.readShort();
-      byte[] bytes = new byte[len];
-      inputStream.readFully(bytes);
-      noDictSortDims[idx] = bytes;
+      // for no dict measure column get the original data
+      noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx);
     }
 
     // read no-dict dims & measures
@@ -240,8 +257,63 @@ public class SortStepRowHandler implements Serializable {
     return new IntermediateSortTempRow(dictSortDims, noDictSortDims,measure);
   }
 
+  /**
+   * Return the data from the stream according to the column type
+   *
+   * @param inputStream
+   * @param idx
+   * @throws IOException
+   */
+  private Object getDataForNoDictSortColumn(DataInputStream inputStream, int idx)
+      throws IOException {
+    if (this.noDictSortColMapping[idx]) {
+      return readDataFromStream(inputStream, idx);
+    } else {
+      short len = inputStream.readShort();
+      byte[] bytes = new byte[len];
+      inputStream.readFully(bytes);
+      return bytes;
+    }
+  }
+
+  /**
+   * Read the data from the stream
+   *
+   * @param inputStream
+   * @param idx
+   * @return
+   * @throws IOException
+   */
+  private Object readDataFromStream(DataInputStream inputStream, int idx) throws IOException {
+    DataType dataType = noDictSortDataTypes[idx];
+    Object data = null;
+    if (!inputStream.readBoolean()) {
+      return null;
+    }
+    if (dataType == DataTypes.BOOLEAN) {
+      data = inputStream.readBoolean();
+    } else if (dataType == DataTypes.BYTE) {
+      data = inputStream.readByte();
+    } else if (dataType == DataTypes.SHORT) {
+      data = inputStream.readShort();
+    } else if (dataType == DataTypes.INT) {
+      data = inputStream.readInt();
+    } else if (dataType == DataTypes.LONG) {
+      data = inputStream.readLong();
+    } else if (dataType == DataTypes.DOUBLE) {
+      data = inputStream.readDouble();
+    } else if (dataType == DataTypes.FLOAT) {
+      data = inputStream.readFloat();
+    } else if (dataType == DataTypes.BYTE_ARRAY || DataTypes.isDecimal(dataType)) {
+      byte[] bytes =
+          inputStream.readUTF().getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      data = bytes;
+    }
+    return data;
+  }
+
   private void unpackNoSortFromBytes(byte[] noSortDimsAndMeasures, int[] dictDims,
-      byte[][] noDictDims, Object[] measures) {
+      Object[] noDictDims, Object[] measures) {
     ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
     // read dict_no_sort
     for (int i = dictSortDimCnt; i < dictDims.length; i++) {
@@ -251,10 +323,15 @@ public class SortStepRowHandler implements Serializable {
     int noDictIndex = noDictSortDimCnt;
     // read no_dict_no_sort
     for (int i = 0; i < noDictNoSortDimCnt; i++) {
-      short len = rowBuffer.getShort();
-      byte[] bytes = new byte[len];
-      rowBuffer.get(bytes);
-      noDictDims[noDictIndex++] = bytes;
+      // for no dict measure column get the original data
+      if (this.noDictNoSortColMapping[i]) {
+        noDictDims[noDictIndex++] = getDataFromRowBuffer(noDictNoSortDataTypes[i], rowBuffer);
+      } else {
+        short len = rowBuffer.getShort();
+        byte[] bytes = new byte[len];
+        rowBuffer.get(bytes);
+        noDictDims[noDictIndex++] = bytes;
+      }
     }
 
     // read varchar dims
@@ -275,39 +352,49 @@ public class SortStepRowHandler implements Serializable {
 
     // read measure
     int measureCnt = measures.length;
-    DataType tmpDataType;
     Object tmpContent;
     for (short idx = 0 ; idx < measureCnt; idx++) {
-      if ((byte) 0 == rowBuffer.get()) {
-        measures[idx] = null;
-        continue;
-      }
+      tmpContent = getDataFromRowBuffer(dataTypes[idx], rowBuffer);
+      measures[idx] = tmpContent;
+    }
+  }
 
-      tmpDataType = dataTypes[idx];
-      if (DataTypes.BOOLEAN == tmpDataType) {
-        if ((byte) 1 == rowBuffer.get()) {
-          tmpContent = true;
-        } else {
-          tmpContent = false;
-        }
-      } else if (DataTypes.SHORT == tmpDataType) {
-        tmpContent = rowBuffer.getShort();
-      } else if (DataTypes.INT == tmpDataType) {
-        tmpContent = rowBuffer.getInt();
-      } else if (DataTypes.LONG == tmpDataType) {
-        tmpContent = rowBuffer.getLong();
-      } else if (DataTypes.DOUBLE == tmpDataType) {
-        tmpContent = rowBuffer.getDouble();
-      } else if (DataTypes.isDecimal(tmpDataType)) {
-        short len = rowBuffer.getShort();
-        byte[] decimalBytes = new byte[len];
-        rowBuffer.get(decimalBytes);
-        tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+  /**
+   * Retrieve/Get the data from the row buffer.
+   *
+   * @param tmpDataType
+   * @param rowBuffer
+   * @return
+   */
+  private Object getDataFromRowBuffer(DataType tmpDataType, ByteBuffer rowBuffer) {
+    Object tmpContent;
+    if ((byte) 0 == rowBuffer.get()) {
+      return null;
+    }
+
+    if (DataTypes.BOOLEAN == tmpDataType) {
+      if ((byte) 1 == rowBuffer.get()) {
+        tmpContent = true;
       } else {
-        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+        tmpContent = false;
       }
-      measures[idx] = tmpContent;
+    } else if (DataTypes.SHORT == tmpDataType) {
+      tmpContent = rowBuffer.getShort();
+    } else if (DataTypes.INT == tmpDataType) {
+      tmpContent = rowBuffer.getInt();
+    } else if (DataTypes.LONG == tmpDataType) {
+      tmpContent = rowBuffer.getLong();
+    } else if (DataTypes.DOUBLE == tmpDataType) {
+      tmpContent = rowBuffer.getDouble();
+    } else if (DataTypes.isDecimal(tmpDataType)) {
+      short len = rowBuffer.getShort();
+      byte[] decimalBytes = new byte[len];
+      rowBuffer.get(decimalBytes);
+      tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+    } else {
+      throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
     }
+    return tmpContent;
   }
 
   /**
@@ -327,9 +414,14 @@ public class SortStepRowHandler implements Serializable {
 
     // write no-dict & sort dim
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = sortTempRow.getNoDictSortDims()[idx];
-      outputStream.writeShort(bytes.length);
-      outputStream.write(bytes);
+      if (this.noDictSortColMapping[idx]) {
+        // write the original data to the stream
+        writeDataToStream(sortTempRow.getNoDictSortDims()[idx], outputStream, idx);
+      } else {
+        byte[] bytes = (byte[]) sortTempRow.getNoDictSortDims()[idx];
+        outputStream.writeShort(bytes.length);
+        outputStream.write(bytes);
+      }
     }
 
     // write packed no-sort dim & measure
@@ -359,9 +451,14 @@ public class SortStepRowHandler implements Serializable {
 
     // write no-dict & sort
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
-      outputStream.writeShort(bytes.length);
-      outputStream.write(bytes);
+      if (this.noDictSortColMapping[idx]) {
+        // write the original data to the stream
+        writeDataToStream(row[this.noDictSortDimIdx[idx]], outputStream, idx);
+      } else {
+        byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
+        outputStream.writeShort(bytes.length);
+        outputStream.write(bytes);
+      }
     }
 
     // pack no-sort
@@ -376,6 +473,46 @@ public class SortStepRowHandler implements Serializable {
   }
 
   /**
+   * Write the data to stream
+   *
+   * @param data
+   * @param outputStream
+   * @param idx
+   * @throws IOException
+   */
+  private void writeDataToStream(Object data, DataOutputStream outputStream, int idx)
+      throws IOException {
+    DataType dataType = noDictSortDataTypes[idx];
+    if (null == data) {
+      outputStream.writeBoolean(false);
+    } else {
+      outputStream.writeBoolean(true);
+      if (dataType == DataTypes.BOOLEAN) {
+        outputStream.writeBoolean((boolean) data);
+      } else if (dataType == DataTypes.BYTE) {
+        outputStream.writeByte((byte) data);
+      } else if (dataType == DataTypes.SHORT) {
+        outputStream.writeShort((short) data);
+      } else if (dataType == DataTypes.INT) {
+        outputStream.writeInt((int) data);
+      } else if (dataType == DataTypes.LONG) {
+        outputStream.writeLong((long) data);
+      } else if (dataType == DataTypes.DOUBLE) {
+        outputStream.writeDouble((double) data);
+      } else if (DataTypes.isDecimal(dataType)) {
+        BigDecimal val = (BigDecimal) data;
+        byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+        outputStream.writeShort(bigDecimalInBytes.length);
+        outputStream.write(bigDecimalInBytes);
+      } else if (dataType == DataTypes.FLOAT) {
+        outputStream.writeFloat((float) data);
+      } else if (dataType == DataTypes.BYTE_ARRAY) {
+        outputStream.writeUTF(data.toString());
+      }
+    }
+  }
+
+  /**
    * Read intermediate sort temp row from unsafe memory.
    * This method is used during merge sort phase for off-heap sort.
    *
@@ -430,9 +567,9 @@ public class SortStepRowHandler implements Serializable {
     int size = 0;
 
     int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-    byte[][] noDictSortDims =
-        new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
-            + this.complexDimCnt][];
+    Object[] noDictSortDims =
+        new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+            + this.complexDimCnt];
 
     // read dict & sort dim
     for (int idx = 0; idx < dictSortDimCnt; idx++) {
@@ -444,11 +581,24 @@ public class SortStepRowHandler implements Serializable {
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
       short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
       size += 2;
-      byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      size += length;
-      noDictSortDims[idx] = bytes;
+      if (this.noDictSortColMapping[idx]) {
+        // get the original data from the unsafe memory
+        if (0 == length) {
+          // if the length is 0, the the data is null
+          noDictSortDims[idx] = null;
+        } else {
+          Object data = CarbonUnsafeUtil
+              .getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length);
+          size += length;
+          noDictSortDims[idx] = data;
+        }
+      } else {
+        byte[] bytes = new byte[length];
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+        size += length;
+        noDictSortDims[idx] = bytes;
+      }
     }
 
     // read no-sort dims & measures
@@ -487,13 +637,26 @@ public class SortStepRowHandler implements Serializable {
     for (int idx = 0; idx < noDictSortDimCnt; idx++) {
       short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
       size += 2;
-      byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      size += length;
+      if (this.noDictSortColMapping[idx]) {
+        // get the original data from unsafe memory
+        if (0 == length) {
+          // if the length is 0, then the data is null
+          writeDataToStream(null, outputStream, idx);
+        } else {
+          Object data = CarbonUnsafeUtil
+              .getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length);
+          size += length;
+          writeDataToStream(data, outputStream, idx);
+        }
+      } else {
+        byte[] bytes = new byte[length];
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+        size += length;
 
-      outputStream.writeShort(length);
-      outputStream.write(bytes);
+        outputStream.writeShort(length);
+        outputStream.write(bytes);
+      }
     }
 
     // packed no-sort & measure
@@ -534,13 +697,31 @@ public class SortStepRowHandler implements Serializable {
 
     // write no-dict & sort
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
-      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
-      size += 2;
-      CarbonUnsafe.getUnsafe()
-          .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
-              bytes.length);
-      size += bytes.length;
+      if (this.noDictSortColMapping[idx]) {
+        Object data = row[this.noDictSortDimIdx[idx]];
+        if (null == data) {
+          // if the data is null, then write only the length as 0.
+          CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) 0);
+          size += 2;
+        } else {
+          int sizeInBytes = this.noDictSortDataTypes[idx].getSizeInBytes();
+          CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) sizeInBytes);
+          size += 2;
+          // put data to unsafe according to the data types
+          CarbonUnsafeUtil
+              .putDataToUnsafe(noDictSortDataTypes[idx], data, baseObject, address, size,
+                  sizeInBytes);
+          size += sizeInBytes;
+        }
+      } else {
+        byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
+        CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
+        size += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
+                bytes.length);
+        size += bytes.length;
+      }
     }
 
     // convert pack no-sort
@@ -574,9 +755,15 @@ public class SortStepRowHandler implements Serializable {
     }
     // convert no-dict & no-sort
     for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
-      rowBuffer.putShort((short) bytes.length);
-      rowBuffer.put(bytes);
+      if (this.noDictNoSortColMapping[idx]) {
+        // put the original data to buffer
+        putDataToRowBuffer(this.noDictNoSortDataTypes[idx], row[this.noDictNoSortDimIdx[idx]],
+            rowBuffer);
+      } else {
+        byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
+        rowBuffer.putShort((short) bytes.length);
+        rowBuffer.put(bytes);
+      }
     }
     // convert varchar dims
     for (int idx = 0; idx < this.varcharDimCnt; idx++) {
@@ -592,37 +779,45 @@ public class SortStepRowHandler implements Serializable {
     }
 
     // convert measure
-    Object tmpValue;
-    DataType tmpDataType;
     for (int idx = 0; idx < this.measureCnt; idx++) {
-      tmpValue = row[this.measureIdx[idx]];
-      tmpDataType = this.dataTypes[idx];
-      if (null == tmpValue) {
-        rowBuffer.put((byte) 0);
-        continue;
-      }
-      rowBuffer.put((byte) 1);
-      if (DataTypes.BOOLEAN == tmpDataType) {
-        if ((boolean) tmpValue) {
-          rowBuffer.put((byte) 1);
-        } else {
-          rowBuffer.put((byte) 0);
-        }
-      } else if (DataTypes.SHORT == tmpDataType) {
-        rowBuffer.putShort((Short) tmpValue);
-      } else if (DataTypes.INT == tmpDataType) {
-        rowBuffer.putInt((Integer) tmpValue);
-      } else if (DataTypes.LONG == tmpDataType) {
-        rowBuffer.putLong((Long) tmpValue);
-      } else if (DataTypes.DOUBLE == tmpDataType) {
-        rowBuffer.putDouble((Double) tmpValue);
-      } else if (DataTypes.isDecimal(tmpDataType)) {
-        byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
-        rowBuffer.putShort((short) decimalBytes.length);
-        rowBuffer.put(decimalBytes);
+      putDataToRowBuffer(this.dataTypes[idx], row[this.measureIdx[idx]], rowBuffer);
+    }
+  }
+
+  /**
+   * Put the data to the row buffer
+   *
+   * @param tmpDataType
+   * @param tmpValue
+   * @param rowBuffer
+   */
+  private void putDataToRowBuffer(DataType tmpDataType, Object tmpValue, ByteBuffer rowBuffer) {
+    if (null == tmpValue) {
+      rowBuffer.put((byte) 0);
+      return;
+    }
+    rowBuffer.put((byte) 1);
+    if (DataTypes.BOOLEAN == tmpDataType) {
+      if ((boolean) tmpValue) {
+        rowBuffer.put((byte) 1);
       } else {
-        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+        rowBuffer.put((byte) 0);
       }
+    } else if (DataTypes.SHORT == tmpDataType) {
+      rowBuffer.putShort((Short) tmpValue);
+    } else if (DataTypes.INT == tmpDataType) {
+      rowBuffer.putInt((Integer) tmpValue);
+    } else if (DataTypes.LONG == tmpDataType) {
+      rowBuffer.putLong((Long) tmpValue);
+    } else if (DataTypes.DOUBLE == tmpDataType) {
+      rowBuffer.putDouble((Double) tmpValue);
+    } else if (DataTypes.isDecimal(tmpDataType)) {
+      byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
+      rowBuffer.putShort((short) decimalBytes.length);
+      rowBuffer.put(decimalBytes);
+    } else {
+      throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
index 8f29cee..b0109fa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -20,7 +20,11 @@ package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
 import java.util.Comparator;
 
 import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.CarbonUnsafeUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
 import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
@@ -52,6 +56,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     long rowA = rowL.address;
     long rowB = rowR.address;
     int sizeInDictPartA = 0;
+    int noDicSortIdx = 0;
 
     int sizeInNonDictPartA = 0;
     int sizeInDictPartB = 0;
@@ -60,25 +65,50 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
       if (isNoDictionary) {
         short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL,
             rowA + dictSizeInMemory + sizeInNonDictPartA);
-        byte[] byteArr1 = new byte[lengthA];
         sizeInNonDictPartA += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA,
-                byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
-        sizeInNonDictPartA += lengthA;
-
         short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR,
             rowB + dictSizeInMemory + sizeInNonDictPartB);
-        byte[] byteArr2 = new byte[lengthB];
         sizeInNonDictPartB += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB,
-                byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
-        sizeInNonDictPartB += lengthB;
+        DataType dataType = tableFieldStat.getNoDictDataType()[noDicSortIdx++];
+        if (DataTypeUtil.isPrimitiveColumn(dataType)) {
+          Object data1 = null;
+          if (0 != lengthA) {
+            data1 = CarbonUnsafeUtil
+                .getDataFromUnsafe(dataType, baseObjectL, rowA + dictSizeInMemory,
+                    sizeInNonDictPartA, lengthA);
+            sizeInNonDictPartA += lengthA;
+          }
+          Object data2 = null;
+          if (0 != lengthB) {
+            data2 = CarbonUnsafeUtil
+                .getDataFromUnsafe(dataType, baseObjectR, rowB + dictSizeInMemory,
+                    sizeInNonDictPartB, lengthB);
+            sizeInNonDictPartB += lengthB;
+          }
+          // use the data type based comparator for the no dictionary encoded columns
+          SerializableComparator comparator =
+              org.apache.carbondata.core.util.comparator.Comparator.getComparator(dataType);
+          int difference = comparator.compare(data1, data2);
+          if (difference != 0) {
+            return difference;
+          }
+        } else {
+          byte[] byteArr1 = new byte[lengthA];
+          CarbonUnsafe.getUnsafe()
+              .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA, byteArr1,
+                  CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
+          sizeInNonDictPartA += lengthA;
+
+          byte[] byteArr2 = new byte[lengthB];
+          CarbonUnsafe.getUnsafe()
+              .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB, byteArr2,
+                  CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
+          sizeInNonDictPartB += lengthB;
 
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
+          int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+          if (difference != 0) {
+            return difference;
+          }
         }
       } else {
         int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeInDictPartA);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 102b057..b805d37 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
@@ -43,6 +44,8 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
 
   private IntermediateSortTempRow currentRow;
 
+  private DataType[] noDictDataType;
+
   public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
       boolean[] noDictSortColumnMapping) {
     this.actualSize = merger.getEntryCount();
@@ -52,8 +55,10 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
     for (UnsafeCarbonRowPage rowPage: rowPages) {
       rowPage.setReadConvertedNoSortField();
     }
+    this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType();
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping);
+    this.comparator =
+        new IntermediateSortTempRowComparator(noDictSortColumnMapping, noDictDataType);
   }
 
   public boolean hasNext() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index 02ffd68..baa9e71 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -45,7 +45,8 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
     this.rowPage = rowPage;
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
     this.comparator = new IntermediateSortTempRowComparator(
-        rowPage.getTableFieldStat().getIsSortColNoDictFlags());
+        rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
+        rowPage.getTableFieldStat().getNoDictDataType());
     this.rowPage.setReadConvertedNoSortField();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 7c3c056..a991d4c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -109,7 +109,8 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     this.tableFieldStat = new TableFieldStat(parameters);
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
     this.executorService = Executors.newFixedThreadPool(1);
-    comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn());
+    comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
+        parameters.getNoDictDataType());
     this.convertNoSortFields = convertNoSortFields;
     initialize();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index ac13d24..7683bbc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -273,19 +273,19 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     int dictIndex = 0;
     int nonDicIndex = 0;
     int[] dim = new int[this.dimensionCount];
-    byte[][] nonDicArray = new byte[this.noDictWithComplextCount][];
+    Object[] nonDicArray = new Object[this.noDictWithComplextCount];
     // read dimension values
     int dimCount = 0;
     for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
       if (isNoDictionaryDimensionColumn[dimCount]) {
-        nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
+        nonDicArray[nonDicIndex++] = row.getObject(dimCount);
       } else {
         dim[dictIndex++] = (int) row.getObject(dimCount);
       }
     }
 
     for (; dimCount < this.dimensionWithComplexCount; dimCount++) {
-      nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
+      nonDicArray[nonDicIndex++] = row.getObject(dimCount);
     }
 
     Object[] measures = new Object[measureCount];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index e3bc97f..ae9ec3d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.processing.loading.partition.impl.RangePartitionerI
 import org.apache.carbondata.processing.loading.partition.impl.RawRowComparator;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -134,12 +135,16 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     // sort the range bounds (sort in carbon is a little different from what we think)
     Arrays.sort(convertedSortColumnRanges,
         new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
-            sortColumnRangeInfo.getIsSortColumnNoDict()));
+            sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
+            .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+                configuration.getTableIdentifier().getTableName())));
 
     // range partitioner to dispatch rows by sort columns
     this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges,
         new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
-            sortColumnRangeInfo.getIsSortColumnNoDict()));
+            sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
+            .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+                configuration.getTableIdentifier().getTableName())));
   }
 
   // only convert sort column fields

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index ce8b62f..b921675 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -266,8 +266,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       Object[] newData = new Object[data.length];
       for (int i = 0; i < data.length; i++) {
         if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
-          newData[i] = DataTypeUtil
-              .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+          if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) {
+            // keep the no dictionary measure column as original data
+            newData[i] = data[orderOfData[i]];
+          } else {
+            newData[i] = DataTypeUtil
+                .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+          }
         } else {
           // if this is a complex column then recursively comver the data into Byte Array.
           if (dataTypes[i].isComplexType()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 0fc229a..1aa6da8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -91,6 +91,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * boolean mapping for no dictionary columns in schema
    */
   private boolean[] noDictionaryColMapping;
+
+  private boolean[] sortColumnMapping;
   /**
    * boolean mapping for long string dimension
    */
@@ -275,7 +277,15 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
         preparedRow[i] = dictionaryValues[dictionaryIndex++];
       } else {
         // no dictionary dims
-        preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
+        byte[] noDictionaryKeyByIndex = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
+        if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
+          // no dictionary measure columns are expected as original data
+          preparedRow[i] = DataTypeUtil
+              .getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeyByIndex,
+                  dims.getDataType());
+        } else {
+          preparedRow[i] = noDictionaryKeyByIndex;
+        }
       }
     }
     // fill all the measures
@@ -357,6 +367,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     measureCount = carbonTable.getMeasureByTableName(tableName).size();
     List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
     noDictionaryColMapping = new boolean[dimensions.size()];
+    sortColumnMapping = new boolean[dimensions.size()];
     isVarcharDimMapping = new boolean[dimensions.size()];
     int i = 0;
     for (CarbonDimension dimension : dimensions) {
@@ -364,6 +375,9 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
         i++;
         continue;
       }
+      if (dimension.isSortColumn()) {
+        sortColumnMapping[i] = true;
+      }
       noDictionaryColMapping[i] = true;
       if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) {
         isVarcharDimMapping[i] = true;
@@ -395,8 +409,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     return SortParameters
         .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
             dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
-            noDictionaryCount, segmentId,
-            carbonLoadModel.getTaskNo(), noDictionaryColMapping, isVarcharDimMapping, true);
+            noDictionaryCount, segmentId, carbonLoadModel.getTaskNo(), noDictionaryColMapping,
+            sortColumnMapping, isVarcharDimMapping, true);
   }
 
   /**
@@ -404,14 +418,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * sort temp files
    */
   private void initializeFinalThreadMergerForMergeSort() {
-    boolean[] noDictionarySortColumnMapping = null;
-    if (noDictionaryColMapping.length == this.segmentProperties.getNumberOfSortColumns()) {
-      noDictionarySortColumnMapping = noDictionaryColMapping;
-    } else {
-      noDictionarySortColumnMapping = new boolean[this.segmentProperties.getNumberOfSortColumns()];
-      System.arraycopy(noDictionaryColMapping, 0,
-          noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length);
-    }
+    boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
+        .getNoDictSortColMapping(carbonTable.getDatabaseName(), carbonTable.getTableName());
     sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
     String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index b877d52..2911c05 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -53,6 +54,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
   private SegmentProperties segprop;
   private CarbonLoadModel loadModel;
   private PartitionSpec partitionSpec;
+
+  CarbonColumn[] noDicAndComplexColumns;
   /**
    * record holder heap
    */
@@ -86,6 +89,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
     setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
     carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
+    this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }
 
@@ -200,7 +204,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
    * @throws SliceMergerException
    */
   private void addRow(Object[] carbonTuple) throws SliceMergerException {
-    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop);
+    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop, noDicAndComplexColumns);
     try {
       this.dataHandler.addDataToStore(row);
     } catch (CarbonDataWriterException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 2dc79a3..00fbc7a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
@@ -37,6 +38,8 @@ public class RowResultProcessor {
   private CarbonFactHandler dataHandler;
   private SegmentProperties segmentProperties;
 
+  private CarbonColumn[] noDicAndComplexColumns;
+
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(RowResultProcessor.class.getName());
 
@@ -59,6 +62,7 @@ public class RowResultProcessor {
     //Note: set compaction flow just to convert decimal type
     carbonFactDataHandlerModel.setCompactionFlow(true);
     carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
+    noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }
 
@@ -97,7 +101,8 @@ public class RowResultProcessor {
   }
 
   private void addRow(Object[] carbonTuple) throws CarbonDataWriterException {
-    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
+    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties,
+        noDicAndComplexColumns);
     try {
       this.dataHandler.addDataToStore(row);
     } catch (CarbonDataWriterException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
index 9b6d1e8..54fa99e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
@@ -19,7 +19,10 @@ package org.apache.carbondata.processing.sort.sortdata;
 
 import java.util.Comparator;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 
 /**
@@ -31,11 +34,15 @@ public class IntermediateSortTempRowComparator implements Comparator<Intermediat
    */
   private boolean[] isSortColumnNoDictionary;
 
+  private DataType[] noDicSortDataTypes;
+
   /**
    * @param isSortColumnNoDictionary isSortColumnNoDictionary
    */
-  public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) {
+  public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary,
+      DataType[] noDicSortDataTypes) {
     this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+    this.noDicSortDataTypes = noDicSortDataTypes;
   }
 
   /**
@@ -45,18 +52,31 @@ public class IntermediateSortTempRowComparator implements Comparator<Intermediat
     int diff = 0;
     int dictIndex = 0;
     int nonDictIndex = 0;
+    int noDicTypeIdx = 0;
 
     for (boolean isNoDictionary : isSortColumnNoDictionary) {
 
       if (isNoDictionary) {
-        byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex];
-        byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex];
-        nonDictIndex++;
+        if (DataTypeUtil.isPrimitiveColumn(noDicSortDataTypes[noDicTypeIdx])) {
+          // use data types based comparator for the no dictionary measure columns
+          SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator
+              .getComparator(noDicSortDataTypes[noDicTypeIdx]);
+          int difference = comparator.compare(rowA.getNoDictSortDims()[nonDictIndex],
+              rowB.getNoDictSortDims()[nonDictIndex]);
+          if (difference != 0) {
+            return difference;
+          }
+          noDicTypeIdx++;
+        } else {
+          byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
+          byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
 
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
+          int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+          if (difference != 0) {
+            return difference;
+          }
         }
+        nonDictIndex++;
       } else {
         int dimFieldA = rowA.getDictSortDims()[dictIndex];
         int dimFieldB = rowB.getDictSortDims()[dictIndex];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
index f47ecc7..4dff644 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -20,7 +20,10 @@ package org.apache.carbondata.processing.sort.sortdata;
 import java.io.Serializable;
 import java.util.Comparator;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public class NewRowComparator implements Comparator<Object[]>, Serializable {
   private static final long serialVersionUID = -1739874611112709436L;
@@ -28,13 +31,20 @@ public class NewRowComparator implements Comparator<Object[]>, Serializable {
   /**
    * mapping of dictionary dimensions and no dictionary of sort_column.
    */
-  private boolean[] noDictionarySortColumnMaping;
+  private boolean[] noDicDimColMapping;
+
+  private DataType[] noDicDataTypes;
+
+  private boolean[] noDicSortColumnMapping;
 
   /**
-   * @param noDictionarySortColumnMaping
+   * @param noDicDimColMapping
    */
-  public NewRowComparator(boolean[] noDictionarySortColumnMaping) {
-    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
+  public NewRowComparator(boolean[] noDicDimColMapping, boolean[] noDicSortColumnMapping,
+      DataType[] noDicDataTypes) {
+    this.noDicDimColMapping = noDicDimColMapping;
+    this.noDicSortColumnMapping = noDicSortColumnMapping;
+    this.noDicDataTypes = noDicDataTypes;
   }
 
   /**
@@ -43,15 +53,31 @@ public class NewRowComparator implements Comparator<Object[]>, Serializable {
   public int compare(Object[] rowA, Object[] rowB) {
     int diff = 0;
     int index = 0;
+    int dataTypeIdx = 0;
+    int noDicSortIdx = 0;
 
-    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-      if (isNoDictionary) {
-        byte[] byteArr1 = (byte[]) rowA[index];
-        byte[] byteArr2 = (byte[]) rowB[index];
+    for (int i = 0; i < noDicDimColMapping.length; i++) {
+      if (noDicDimColMapping[i]) {
+        if (noDicSortColumnMapping[noDicSortIdx++]) {
+          if (DataTypeUtil.isPrimitiveColumn(noDicDataTypes[dataTypeIdx])) {
+            // use data types based comparator for the no dictionary measure columns
+            SerializableComparator comparator =
+                org.apache.carbondata.core.util.comparator.Comparator
+                    .getComparator(noDicDataTypes[dataTypeIdx]);
+            int difference = comparator.compare(rowA[index], rowB[index]);
+            if (difference != 0) {
+              return difference;
+            }
+            dataTypeIdx++;
+          } else {
+            byte[] byteArr1 = (byte[]) rowA[index];
+            byte[] byteArr2 = (byte[]) rowB[index];
 
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
+            int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+            if (difference != 0) {
+              return difference;
+            }
+          }
         }
       } else {
         int dimFieldA = (int) rowA[index];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index a5caf7b..730c729 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -203,7 +203,9 @@ public class SortDataRows {
       toSort = new Object[entryCount][];
       System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
       if (parameters.getNumberOfNoDictSortColumns() > 0) {
-        Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn()));
+        Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn(),
+            parameters.getNoDictionarySortColumn(),
+            parameters.getNoDictDataType()));
       } else {
         Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
       }
@@ -315,7 +317,8 @@ public class SortDataRows {
         long startTime = System.currentTimeMillis();
         if (parameters.getNumberOfNoDictSortColumns() > 0) {
           Arrays.sort(recordHolderArray,
-              new NewRowComparator(parameters.getNoDictionarySortColumn()));
+              new NewRowComparator(parameters.getNoDictionaryDimnesionColumn(),
+                  parameters.getNoDictionarySortColumn(), parameters.getNoDictDataType()));
         } else {
           Arrays.sort(recordHolderArray,
               new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));