You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/09/18 13:38:15 UTC

[1/4] carbondata git commit: [CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types

Repository: carbondata
Updated Branches:
  refs/heads/master 61fcdf286 -> c8f706304


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index d3d538a..c4416d5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.sort.sortdata;
 
 import java.io.File;
 import java.io.Serializable;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -88,6 +89,17 @@ public class SortParameters implements Serializable {
 
   private DataType[] measureDataType;
 
+  // no dictionary data types of the table
+  private DataType[] noDictDataType;
+
+  // no dictionary columns data types participating in sort
+  // used while writing the row to sort temp file where sort no dict columns are handled seperately
+  private DataType[] noDictSortDataType;
+
+  // no dictionary columns data types not participating in sort
+  // used while writing the row to sort temp file where nosort nodict columns are handled seperately
+  private DataType[] noDictNoSortDataType;
+
   /**
    * To know how many columns are of high cardinality.
    */
@@ -111,6 +123,8 @@ public class SortParameters implements Serializable {
   private boolean[] noDictionaryDimnesionColumn;
 
   private boolean[] noDictionarySortColumn;
+
+  private boolean[] sortColumn;
   /**
    * whether dimension is varchar data type.
    * since all dimensions are string, we use an array of boolean instead of datatypes
@@ -142,11 +156,15 @@ public class SortParameters implements Serializable {
     parameters.databaseName = databaseName;
     parameters.tableName = tableName;
     parameters.measureDataType = measureDataType;
+    parameters.noDictDataType = noDictDataType;
+    parameters.noDictSortDataType = noDictSortDataType;
+    parameters.noDictNoSortDataType = noDictNoSortDataType;
     parameters.noDictionaryCount = noDictionaryCount;
     parameters.partitionID = partitionID;
     parameters.segmentId = segmentId;
     parameters.taskNo = taskNo;
     parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+    parameters.sortColumn = sortColumn;
     parameters.isVarcharDimensionColumn = isVarcharDimensionColumn;
     parameters.noDictionarySortColumn = noDictionarySortColumn;
     parameters.numberOfSortColumns = numberOfSortColumns;
@@ -382,7 +400,10 @@ public class SortParameters implements Serializable {
 
     parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
     parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
-    setNoDictionarySortColumnMapping(parameters);
+    parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
+        .getNoDictSortColMapping(configuration.getTableIdentifier().getDatabaseName(),
+            configuration.getTableIdentifier().getTableName()));
+    parameters.setSortColumn(configuration.getSortColumnMapping());
     parameters.setObserver(new SortObserver());
     // get sort buffer size
     parameters.setSortBufferSize(Integer.parseInt(carbonProperties
@@ -431,6 +452,14 @@ public class SortParameters implements Serializable {
 
     DataType[] measureDataType = configuration.getMeasureDataType();
     parameters.setMeasureDataType(measureDataType);
+    parameters.setNoDictDataType(CarbonDataProcessorUtil
+        .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+            configuration.getTableIdentifier().getTableName()));
+    Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
+        .getNoDictSortAndNoSortDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+            configuration.getTableIdentifier().getTableName());
+    parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
+    parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
     return parameters;
   }
 
@@ -442,28 +471,10 @@ public class SortParameters implements Serializable {
     this.rangeId = rangeId;
   }
 
-  /**
-   * this method will set the boolean mapping for no dictionary sort columns
-   *
-   * @param parameters
-   */
-  private static void setNoDictionarySortColumnMapping(SortParameters parameters) {
-    if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length) {
-      parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn());
-    } else {
-      boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()];
-      System
-          .arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0, noDictionarySortColumnTemp, 0,
-              parameters.getNumberOfSortColumns());
-      parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp);
-    }
-  }
-
   public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
       String tableName, int dimColCount, int complexDimColCount, int measureColCount,
-      int noDictionaryCount, String segmentId, String taskNo,
-      boolean[] noDictionaryColMaping, boolean[] isVarcharDimensionColumn,
-      boolean isCompactionFlow) {
+      int noDictionaryCount, String segmentId, String taskNo, boolean[] noDictionaryColMaping,
+      boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow) {
     SortParameters parameters = new SortParameters();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
     parameters.setDatabaseName(databaseName);
@@ -478,6 +489,7 @@ public class SortParameters implements Serializable {
     parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
     parameters.setComplexDimColCount(complexDimColCount);
     parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
+    parameters.setSortColumn(sortColumnMapping);
     parameters.setIsVarcharDimensionColumn(isVarcharDimensionColumn);
     parameters.setObserver(new SortObserver());
     // get sort buffer size
@@ -523,8 +535,46 @@ public class SortParameters implements Serializable {
         .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
             parameters.getTableName());
     parameters.setMeasureDataType(type);
-    setNoDictionarySortColumnMapping(parameters);
+    parameters.setNoDictDataType(CarbonDataProcessorUtil
+        .getNoDictDataTypes(parameters.getDatabaseName(), parameters.getTableName()));
+    Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
+        .getNoDictSortAndNoSortDataTypes(parameters.getDatabaseName(), parameters.getTableName());
+    parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
+    parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
+    parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
+        .getNoDictSortColMapping(parameters.getDatabaseName(), parameters.getTableName()));
     return parameters;
   }
 
+  public DataType[] getNoDictSortDataType() {
+    return noDictSortDataType;
+  }
+
+  public void setNoDictSortDataType(DataType[] noDictSortDataType) {
+    this.noDictSortDataType = noDictSortDataType;
+  }
+
+  public DataType[] getNoDictNoSortDataType() {
+    return noDictNoSortDataType;
+  }
+
+  public DataType[] getNoDictDataType() {
+    return noDictDataType;
+  }
+
+  public void setNoDictNoSortDataType(DataType[] noDictNoSortDataType) {
+    this.noDictNoSortDataType = noDictNoSortDataType;
+  }
+
+  public void setNoDictDataType(DataType[] noDictDataType) {
+    this.noDictDataType = noDictDataType;
+  }
+
+  public boolean[] getSortColumn() {
+    return sortColumn;
+  }
+
+  public void setSortColumn(boolean[] sortColumn) {
+    this.sortColumn = sortColumn;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index e39fe1d..a1ef04e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -115,7 +115,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     this.tableFieldStat = new TableFieldStat(sortParameters);
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
     this.comparator = new IntermediateSortTempRowComparator(
-        tableFieldStat.getIsSortColNoDictFlags());
+        tableFieldStat.getIsSortColNoDictFlags(), tableFieldStat.getNoDictDataType());
     this.executorService = Executors
         .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
     this.convertToActualField = convertToActualField;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 353ddb4..e9ed6f3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -44,6 +44,9 @@ public class TableFieldStat implements Serializable {
   private boolean[] isVarcharDimFlags;
   private int measureCnt;
   private DataType[] measureDataType;
+  private DataType[] noDictDataType;
+  private DataType[] noDictSortDataType;
+  private DataType[] noDictNoSortDataType;
 
   // indices for dict & sort dimension columns
   private int[] dictSortDimIdx;
@@ -66,17 +69,20 @@ public class TableFieldStat implements Serializable {
     this.complexDimCnt = sortParameters.getComplexDimColCount();
     this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
     this.isVarcharDimFlags = sortParameters.getIsVarcharDimensionColumn();
-    int sortColCnt = isSortColNoDictFlags.length;
-    for (boolean flag : isSortColNoDictFlags) {
-      if (flag) {
+    boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
+    boolean[] sortColumn = sortParameters.getSortColumn();
+    for (int i = 0; i < isDimNoDictFlags.length; i++) {
+      if (isDimNoDictFlags[i] && sortColumn[i]) {
         noDictSortDimCnt++;
-      } else {
+      } else if (!isDimNoDictFlags[i] && sortColumn[i]) {
         dictSortDimCnt++;
       }
     }
     this.measureCnt = sortParameters.getMeasureColCount();
     this.measureDataType = sortParameters.getMeasureDataType();
-
+    this.noDictDataType = sortParameters.getNoDictDataType();
+    this.noDictSortDataType = sortParameters.getNoDictSortDataType();
+    this.noDictNoSortDataType = sortParameters.getNoDictNoSortDataType();
     for (boolean flag : isVarcharDimFlags) {
       if (flag) {
         varcharDimCnt++;
@@ -97,19 +103,18 @@ public class TableFieldStat implements Serializable {
     int tmpDictSortCnt = 0;
     int tmpDictNoSortCnt = 0;
     int tmpVarcharCnt = 0;
-    boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
 
     for (int i = 0; i < isDimNoDictFlags.length; i++) {
       if (isDimNoDictFlags[i]) {
         if (isVarcharDimFlags[i]) {
           varcharDimIdx[tmpVarcharCnt++] = i;
-        } else if (i < sortColCnt && isSortColNoDictFlags[i]) {
+        } else if (sortColumn[i]) {
           noDictSortDimIdx[tmpNoDictSortCnt++] = i;
         } else {
           noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
         }
       } else {
-        if (i < sortColCnt && !isSortColNoDictFlags[i]) {
+        if (sortColumn[i]) {
           dictSortDimIdx[tmpDictSortCnt++] = i;
         } else {
           dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
@@ -217,4 +222,17 @@ public class TableFieldStat implements Serializable {
     return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt,
         noDictNoSortDimCnt, complexDimCnt, varcharDimCnt, measureCnt);
   }
+
+  public DataType[] getNoDictSortDataType() {
+    return noDictSortDataType;
+  }
+
+  public DataType[] getNoDictNoSortDataType() {
+    return noDictNoSortDataType;
+  }
+
+
+  public DataType[] getNoDictDataType() {
+    return noDictDataType;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 7151b47..c23b071 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 
@@ -240,9 +241,17 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private boolean isVarcharColumnFull(CarbonRow row) {
     if (model.getVarcharDimIdxInNoDict().size() > 0) {
-      byte[][] nonDictArray = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+      Object[] nonDictArray = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < model.getVarcharDimIdxInNoDict().size(); i++) {
-        varcharColumnSizeInByte[i] += nonDictArray[model.getVarcharDimIdxInNoDict().get(i)].length;
+        if (DataTypeUtil
+            .isPrimitiveColumn(model.getNoDictAndComplexColumns()[i].getDataType())) {
+          // get the size from the data type
+          varcharColumnSizeInByte[i] +=
+              model.getNoDictAndComplexColumns()[i].getDataType().getSizeInBytes();
+        } else {
+          varcharColumnSizeInByte[i] +=
+              ((byte[]) nonDictArray[model.getVarcharDimIdxInNoDict().get(i)]).length;
+        }
         if (SnappyCompressor.MAX_BYTE_TO_COMPRESS -
                 (varcharColumnSizeInByte[i] + dataRows.size() * 4) < (2 << 20)) {
           LOGGER.info("Limited by varchar column, page size is " + dataRows.size());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 1a38de6..4b42bfc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -125,6 +125,12 @@ public class CarbonFactDataHandlerModel {
    * data type of all measures in the table
    */
   private DataType[] measureDataType;
+
+  /**
+   * no dictionary and complex columns in the table
+   */
+  private CarbonColumn[] noDictAndComplexColumns;
+
   /**
    * carbon data file attributes like task id, file stamp
    */
@@ -276,6 +282,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
     carbonFactDataHandlerModel.setColCardinality(colCardinality);
     carbonFactDataHandlerModel.setMeasureDataType(configuration.getMeasureDataType());
+    carbonFactDataHandlerModel
+        .setNoDictAndComplexColumns(configuration.getNoDictAndComplexDimensions());
     carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
     carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen);
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
@@ -325,12 +333,20 @@ public class CarbonFactDataHandlerModel {
     List<CarbonDimension> allDimensions = carbonTable.getDimensions();
     int dictDimCount = allDimensions.size() - segmentProperties.getNumberOfNoDictionaryDimension()
             - segmentProperties.getComplexDimensions().size();
+    CarbonColumn[] noDicAndComplexColumns =
+        new CarbonColumn[segmentProperties.getNumberOfNoDictionaryDimension() + segmentProperties
+            .getComplexDimensions().size()];
+    int noDicAndComp = 0;
     for (CarbonDimension dim : allDimensions) {
       if (!dim.isComplex() && !dim.hasEncoding(Encoding.DICTIONARY) &&
           dim.getDataType() == DataTypes.VARCHAR) {
         // ordinal is set in CarbonTable.fillDimensionsAndMeasuresForTables()
         varcharDimIdxInNoDict.add(dim.getOrdinal() - dictDimCount);
       }
+      if (!dim.hasEncoding(Encoding.DICTIONARY)) {
+        noDicAndComplexColumns[noDicAndComp++] =
+            new CarbonColumn(dim.getColumnSchema(), dim.getOrdinal(), dim.getSchemaOrdinal());
+      }
     }
 
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
@@ -365,6 +381,7 @@ public class CarbonFactDataHandlerModel {
       measureDataTypes[i++] = msr.getDataType();
     }
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
+    carbonFactDataHandlerModel.setNoDictAndComplexColumns(noDicAndComplexColumns);
     CarbonUtil.checkAndCreateFolderWithPermission(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
@@ -713,5 +730,13 @@ public class CarbonFactDataHandlerModel {
   public void setColumnCompressor(String columnCompressor) {
     this.columnCompressor = columnCompressor;
   }
+
+  public CarbonColumn[] getNoDictAndComplexColumns() {
+    return noDictAndComplexColumns;
+  }
+
+  public void setNoDictAndComplexColumns(CarbonColumn[] noDictAndComplexColumns) {
+    this.noDictAndComplexColumns = noDictAndComplexColumns;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index a311483..2f49ef2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
@@ -60,6 +62,9 @@ import org.apache.carbondata.processing.datatypes.GenericDataType;
  */
 public class TablePage {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(TablePage.class.getName());
+
   // For all dimension and measure columns, we store the column data directly in the page,
   // the length of the page is the number of rows.
 
@@ -125,10 +130,24 @@ public class TablePage {
           page = ColumnPage.newLocalDictPage(
               columnPageEncoderMeta, pageSize, localDictionaryGenerator, false);
         } else {
-          page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
+          if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) {
+            columnPageEncoderMeta =
+                new ColumnPageEncoderMeta(spec, spec.getSchemaDataType(), columnCompressor);
+            // create the column page according to the data type for no dictionary numeric columns
+            if (DataTypes.isDecimal(spec.getSchemaDataType())) {
+              page = ColumnPage.newDecimalPage(columnPageEncoderMeta, pageSize);
+            } else {
+              page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
+            }
+          } else {
+            page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
+          }
         }
+        // set the stats collector according to the data type of the columns
         if (DataTypes.VARCHAR == dataType) {
           page.setStatsCollector(LVLongStringStatsCollector.newInstance());
+        } else if (DataTypeUtil.isPrimitiveColumn(spec.getSchemaDataType())) {
+          page.setStatsCollector(PrimitivePageStatsCollector.newInstance(spec.getSchemaDataType()));
         } else {
           page.setStatsCollector(LVShortStringStatsCollector.newInstance());
         }
@@ -194,22 +213,35 @@ public class TablePage {
     int complexColumnCount = complexDimensionPages.length;
     if (noDictionaryCount > 0 || complexColumnCount > 0) {
       TableSpec tableSpec = model.getTableSpec();
-      byte[][] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       List<TableSpec.DimensionSpec> noDictionaryDimensionSpec =
           tableSpec.getNoDictionaryDimensionSpec();
+      Object[] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < noDictAndComplex.length; i++) {
         if (noDictionaryDimensionSpec.get(i).getSchemaDataType()
             == DataTypes.VARCHAR) {
-          byte[] valueWithLength = addIntLengthToByteArray(noDictAndComplex[i]);
+          byte[] valueWithLength = addIntLengthToByteArray((byte[]) noDictAndComplex[i]);
           noDictDimensionPages[i].putData(rowId, valueWithLength);
         } else if (i < noDictionaryCount) {
-          // noDictionary columns, since it is variable length, we need to prepare each
-          // element as LV result byte array (first two bytes are the length of the array)
-          byte[] valueWithLength = addShortLengthToByteArray(noDictAndComplex[i]);
-          noDictDimensionPages[i].putData(rowId, valueWithLength);
+          if (DataTypeUtil
+              .isPrimitiveColumn(noDictDimensionPages[i].getColumnSpec().getSchemaDataType())) {
+            // put the actual data to the row
+            Object value = noDictAndComplex[i];
+            // in compaction flow the measure with decimal type will come as Spark decimal.
+            // need to convert it to byte array.
+            if (DataTypes.isDecimal(noDictDimensionPages[i].getDataType()) && model
+                .isCompactionFlow() && value != null) {
+              value = DataTypeUtil.getDataTypeConverter().convertFromDecimalToBigDecimal(value);
+            }
+            noDictDimensionPages[i].putData(rowId, value);
+          } else {
+            // noDictionary columns, since it is variable length, we need to prepare each
+            // element as LV result byte array (first two bytes are the length of the array)
+            byte[] valueWithLength = addShortLengthToByteArray((byte[]) noDictAndComplex[i]);
+            noDictDimensionPages[i].putData(rowId, valueWithLength);
+          }
         } else {
           // complex columns
-          addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
+          addComplexColumn(i - noDictionaryCount, rowId, (byte[]) noDictAndComplex[i]);
         }
       }
     }
@@ -373,7 +405,19 @@ public class TablePage {
           columnPageEncoder = encodingFactory.createEncoder(
               spec,
               noDictDimensionPages[noDictIndex]);
-          encodedPage = columnPageEncoder.encode(noDictDimensionPages[noDictIndex++]);
+          encodedPage = columnPageEncoder.encode(noDictDimensionPages[noDictIndex]);
+          if (LOGGER.isDebugEnabled()) {
+            DataType targetDataType =
+                columnPageEncoder.getTargetDataType(noDictDimensionPages[noDictIndex]);
+            if (null != targetDataType) {
+              LOGGER.debug(
+                  "Encoder result ---> Source data type: " + noDictDimensionPages[noDictIndex]
+                      .getDataType().getName() + " Destination data type: " + targetDataType
+                      .getName() + " for the column: " + noDictDimensionPages[noDictIndex]
+                      .getColumnSpec().getFieldName());
+            }
+          }
+          noDictIndex++;
           encodedDimensions.add(encodedPage);
           break;
         case COMPLEX:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index c2b21a6..3ba1e1d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -426,12 +427,81 @@ public final class CarbonDataProcessorUtil {
     return type;
   }
 
-  public static DataType[] getMeasureDataType(int measureCount, DataField[] measureFields) {
-    DataType[] type = new DataType[measureCount];
-    for (int i = 0; i < type.length; i++) {
-      type[i] = measureFields[i].getColumn().getDataType();
+  /**
+   * Get the no dictionary data types on the table
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  public static DataType[] getNoDictDataTypes(String databaseName, String tableName) {
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+    List<DataType> type = new ArrayList<>();
+    for (int i = 0; i < dimensions.size(); i++) {
+      if (!dimensions.get(i).hasEncoding(Encoding.DICTIONARY)) {
+        type.add(dimensions.get(i).getDataType());
+      }
     }
-    return type;
+    return type.toArray(new DataType[type.size()]);
+  }
+
+  /**
+   * Get the no dictionary sort column mapping of the table
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  public static boolean[] getNoDictSortColMapping(String databaseName, String tableName) {
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+    List<Boolean> noDicSortColMap = new ArrayList<>();
+    for (int i = 0; i < dimensions.size(); i++) {
+      if (!dimensions.get(i).hasEncoding(Encoding.DICTIONARY)) {
+        if (dimensions.get(i).isSortColumn()) {
+          noDicSortColMap.add(true);
+        } else {
+          noDicSortColMap.add(false);
+        }
+      }
+    }
+    Boolean[] mapping = noDicSortColMap.toArray(new Boolean[noDicSortColMap.size()]);
+    boolean[] noDicSortColMapping = new boolean[mapping.length];
+    for (int i = 0; i < mapping.length; i++) {
+      noDicSortColMapping[i] = mapping[i].booleanValue();
+    }
+    return noDicSortColMapping;
+  }
+
+  /**
+   * Get the data types of the no dictionary sort columns
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(String databaseName,
+      String tableName) {
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+    List<DataType> noDictSortType = new ArrayList<>();
+    List<DataType> noDictNoSortType = new ArrayList<>();
+    for (int i = 0; i < dimensions.size(); i++) {
+      if (!dimensions.get(i).hasEncoding(Encoding.DICTIONARY)) {
+        if (dimensions.get(i).isSortColumn()) {
+          noDictSortType.add(dimensions.get(i).getDataType());
+        } else {
+          noDictNoSortType.add(dimensions.get(i).getDataType());
+        }
+      }
+    }
+    DataType[] noDictSortTypes = noDictSortType.toArray(new DataType[noDictSortType.size()]);
+    DataType[] noDictNoSortTypes = noDictNoSortType.toArray(new DataType[noDictNoSortType.size()]);
+    Map<String, DataType[]> noDictSortAndNoSortTypes = new HashMap<>(2);
+    noDictSortAndNoSortTypes.put("noDictSortDataTypes", noDictSortTypes);
+    noDictSortAndNoSortTypes.put("noDictNoSortDataTypes", noDictNoSortTypes);
+    return noDictSortAndNoSortTypes;
   }
 
   /**


[3/4] carbondata git commit: [CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
index cc044cc..f232652 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
@@ -21,8 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -46,7 +46,7 @@ public class ComplexDimensionIndexCodec extends IndexStorageCodec {
     return new IndexStorageEncoder() {
       @Override
       void encodeIndexStorage(ColumnPage inputPage) {
-        IndexStorage indexStorage =
+        BlockIndexerStorage<byte[][]> indexStorage =
             new BlockIndexerStorageForShort(inputPage.getByteArrayPage(), false, false, false);
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         Compressor compressor = CompressorFactory.getInstance().getCompressor(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
index 66f5f1d..f3475fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
@@ -21,9 +21,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -47,7 +47,7 @@ public class DictDimensionIndexCodec extends IndexStorageCodec {
     return new IndexStorageEncoder() {
       @Override
       void encodeIndexStorage(ColumnPage inputPage) {
-        IndexStorage indexStorage;
+        BlockIndexerStorage<byte[][]> indexStorage;
         byte[][] data = inputPage.getByteArrayPage();
         if (isInvertedIndex) {
           indexStorage = new BlockIndexerStorageForShort(data, true, false, isSort);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
index a130cbd..15827f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -21,9 +21,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -47,7 +47,7 @@ public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
     return new IndexStorageEncoder() {
       @Override
       void encodeIndexStorage(ColumnPage inputPage) {
-        IndexStorage indexStorage;
+        BlockIndexerStorage<byte[][]> indexStorage;
         byte[][] data = inputPage.getByteArrayPage();
         if (isInvertedIndex) {
           indexStorage = new BlockIndexerStorageForShort(data, false, false, isSort);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
index bce8523..7a1627c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
@@ -21,9 +21,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -54,7 +54,7 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
 
       @Override
       protected void encodeIndexStorage(ColumnPage input) {
-        IndexStorage indexStorage;
+        BlockIndexerStorage<byte[][]> indexStorage;
         byte[][] data = input.getByteArrayPage();
         boolean isDictionary = input.isLocalDictGeneratedPage();
         if (isInvertedIndex) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
index 37d9052..96fcc08 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
@@ -22,7 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
@@ -31,7 +31,7 @@ import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.SortState;
 
 public abstract class IndexStorageEncoder extends ColumnPageEncoder {
-  IndexStorage indexStorage;
+  BlockIndexerStorage indexStorage;
   byte[] compressedDataPage;
 
   abstract void encodeIndexStorage(ColumnPage inputPage);
@@ -61,7 +61,9 @@ public abstract class IndexStorageEncoder extends ColumnPageEncoder {
         out.writeShort(dataRle);
       }
     }
-    return stream.toByteArray();
+    byte[] result = stream.toByteArray();
+    stream.close();
+    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java
index c2b722b..609f17e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java
@@ -61,7 +61,8 @@ public class TablePageKey {
   /** update all keys based on the input row */
   public void update(int rowId, CarbonRow row, byte[] mdk) {
     if (hasNoDictionary) {
-      currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+      Object[] noDictAndComplexDimension = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+      currentNoDictionaryKey = new byte[noDictAndComplexDimension.length][0];
     }
     if (rowId == 0) {
       startKey = mdk;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
index 665860b..29a180b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.statistics;
 
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 // Statistics of dimension and measure column in a TablePage
 public class TablePageStatistics {
@@ -58,8 +59,17 @@ public class TablePageStatistics {
   private void updateDimensionMinMax(EncodedColumnPage[] dimensions) {
     for (int i = 0; i < dimensions.length; i++) {
       SimpleStatsResult stats = dimensions[i].getStats();
-      dimensionMaxValue[i] = CarbonUtil.getValueAsBytes(stats.getDataType(), stats.getMax());
-      dimensionMinValue[i] = CarbonUtil.getValueAsBytes(stats.getDataType(), stats.getMin());
+      Object min = stats.getMin();
+      Object max = stats.getMax();
+      if (CarbonUtil.isEncodedWithMeta(dimensions[i].getPageMetadata().getEncoders())) {
+        dimensionMaxValue[i] = DataTypeUtil
+            .getMinMaxBytesBasedOnDataTypeForNoDictionaryColumn(max, stats.getDataType());
+        dimensionMinValue[i] = DataTypeUtil
+            .getMinMaxBytesBasedOnDataTypeForNoDictionaryColumn(min, stats.getDataType());
+      } else {
+        dimensionMaxValue[i] = CarbonUtil.getValueAsBytes(stats.getDataType(), max);
+        dimensionMinValue[i] = CarbonUtil.getValueAsBytes(stats.getDataType(), min);
+      }
       writeMinMaxForDimensions[i] = stats.writeMinMax();
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
index 08dd800..3d9de56 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
@@ -20,7 +20,9 @@ package org.apache.carbondata.core.datastore.row;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 // Utility to create and retrieve data from CarbonRow in write step.
 public class WriteStepRowUtil {
@@ -34,7 +36,7 @@ public class WriteStepRowUtil {
   public static final int NO_DICTIONARY_AND_COMPLEX = 1;
   public static final int MEASURE = 2;
 
-  public static CarbonRow fromColumnCategory(int[] dictDimensions, byte[][] noDictAndComplex,
+  public static CarbonRow fromColumnCategory(int[] dictDimensions, Object[] noDictAndComplex,
       Object[] measures) {
     Object[] row = new Object[3];
     row[DICTIONARY_DIMENSION] = dictDimensions;
@@ -43,7 +45,8 @@ public class WriteStepRowUtil {
     return new CarbonRow(row);
   }
 
-  public static CarbonRow fromMergerRow(Object[] row, SegmentProperties segmentProperties) {
+  public static CarbonRow fromMergerRow(Object[] row, SegmentProperties segmentProperties,
+      CarbonColumn[] noDicAndComplexColumns) {
     Object[] converted = new Object[3];
 
     // dictionary dimension
@@ -55,8 +58,23 @@ public class WriteStepRowUtil {
     }
     converted[DICTIONARY_DIMENSION] = dictDimensions;
 
+    byte[][] noDictionaryKeys = ((ByteArrayWrapper) row[0]).getNoDictionaryKeys();
+    Object[] noDictKeys = new Object[noDictionaryKeys.length];
+    for (int i = 0; i < noDictionaryKeys.length; i++) {
+      // in case of compaction rows are collected from result collector and are in byte[].
+      // Convert the no dictionary columns to original data,
+      // as load expects the no dictionary column with original data.
+      if (DataTypeUtil.isPrimitiveColumn(noDicAndComplexColumns[i].getDataType())) {
+        noDictKeys[i] = DataTypeUtil
+            .getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeys[i],
+                noDicAndComplexColumns[i].getDataType());
+      } else {
+        noDictKeys[i] = noDictionaryKeys[i];
+      }
+    }
+
     // no dictionary and complex dimension
-    converted[NO_DICTIONARY_AND_COMPLEX] = ((ByteArrayWrapper) row[0]).getNoDictionaryKeys();
+    converted[NO_DICTIONARY_AND_COMPLEX] = noDictKeys;
 
     // measure
     int measureCount = row.length - 1;
@@ -75,8 +93,8 @@ public class WriteStepRowUtil {
     return keyGenerator.generateKey(getDictDimension(row));
   }
 
-  public static byte[][] getNoDictAndComplexDimension(CarbonRow row) {
-    return (byte[][]) row.getData()[NO_DICTIONARY_AND_COMPLEX];
+  public static Object[] getNoDictAndComplexDimension(CarbonRow row) {
+    return (Object[]) row.getData()[NO_DICTIONARY_AND_COMPLEX];
   }
 
   public static Object[] getMeasure(CarbonRow row) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 7cc2b09..219e7a2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -58,7 +58,10 @@ import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 import org.apache.commons.lang3.ArrayUtils;
 
@@ -745,4 +748,37 @@ public class QueryUtil {
       }
     }
   }
+
+  /**
+   * Put the data to vector
+   *
+   * @param vector
+   * @param value
+   * @param vectorRow
+   * @param length
+   */
+  public static void putDataToVector(CarbonColumnVector vector, byte[] value, int vectorRow,
+      int length) {
+    DataType dt = vector.getType();
+    if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
+        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, value, 0, length)) {
+      vector.putNull(vectorRow);
+    } else {
+      if (dt == DataTypes.STRING) {
+        vector.putBytes(vectorRow, 0, length, value);
+      } else if (dt == DataTypes.BOOLEAN) {
+        vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
+      } else if (dt == DataTypes.SHORT) {
+        vector.putShort(vectorRow, ByteUtil.toXorShort(value, 0, length));
+      } else if (dt == DataTypes.INT) {
+        vector.putInt(vectorRow, ByteUtil.toXorInt(value, 0, length));
+      } else if (dt == DataTypes.LONG) {
+        vector.putLong(vectorRow, DataTypeUtil
+            .getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0, length));
+      } else if (dt == DataTypes.TIMESTAMP) {
+        vector.putLong(vectorRow, ByteUtil.toXorLong(value, 0, length) * 1000L);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 410f67a..8c0ea56 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -645,8 +645,9 @@ public final class FilterUtil {
           continue;
         }
 
-        filterValuesList
-            .add(DataTypeUtil.getMeasureValueBasedOnDataType(result, dataType, carbonMeasure));
+        filterValuesList.add(DataTypeUtil
+            .getMeasureValueBasedOnDataType(result, dataType, carbonMeasure.getScale(),
+                carbonMeasure.getPrecision()));
 
       }
     } catch (Throwable ex) {
@@ -2178,4 +2179,41 @@ public final class FilterUtil {
     }
     return filterExecuter;
   }
+
+  /**
+   * This method is used to compare the filter value with min and max values.
+   * This is used in case of filter queries on no dictionary column.
+   *
+   * @param filterValue
+   * @param minMaxBytes
+   * @param carbonDimension
+   * @param isMin
+   * @return
+   */
+  public static int compareValues(byte[] filterValue, byte[] minMaxBytes,
+      CarbonDimension carbonDimension, boolean isMin) {
+    DataType dataType = carbonDimension.getDataType();
+    if (DataTypeUtil.isPrimitiveColumn(dataType) && !carbonDimension
+        .hasEncoding(Encoding.DICTIONARY)) {
+      Object value =
+          DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minMaxBytes, dataType);
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      Object data = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(filterValue, dataType);
+      SerializableComparator comparator = Comparator.getComparator(dataType);
+      if (isMin) {
+        return comparator.compare(value, data);
+      } else {
+        return comparator.compare(data, value);
+      }
+    } else {
+      if (isMin) {
+        return ByteUtil.UnsafeComparer.INSTANCE.compareTo(minMaxBytes, filterValue);
+      } else {
+        return ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValue, minMaxBytes);
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 7017f21..04264f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -397,7 +397,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
       return bitSet;
     }
     // binary search can only be applied if column is sorted
-    if (isNaturalSorted) {
+    if (isNaturalSorted && dimensionColumnPage.isExplicitSorted()) {
       int startIndex = 0;
       for (int i = 0; i < filterValues.length; i++) {
         if (startIndex >= numerOfRows) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index b9df60c..ddc9751 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.filter.executer;
 import java.io.IOException;
 import java.util.BitSet;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
@@ -110,9 +112,20 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
       boolean isDecoded = false;
       for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
         if (dimensionRawColumnChunk.getMaxValues() != null) {
-          if (isScanRequired(dimensionRawColumnChunk.getMaxValues()[i],
+          boolean scanRequired;
+          // for no dictionary measure column comparison can be done
+          // on the original data as like measure column
+          if (DataTypeUtil.isPrimitiveColumn(dimColumnEvaluatorInfo.getDimension().getDataType())
+              && !dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+            scanRequired = isScanRequired(dimensionRawColumnChunk.getMaxValues()[i],
+                dimensionRawColumnChunk.getMinValues()[i], dimColumnExecuterInfo.getFilterKeys(),
+                dimColumnEvaluatorInfo.getDimension().getDataType());
+          } else {
+            scanRequired = isScanRequired(dimensionRawColumnChunk.getMaxValues()[i],
               dimensionRawColumnChunk.getMinValues()[i], dimColumnExecuterInfo.getFilterKeys(),
-              dimensionRawColumnChunk.getMinMaxFlagArray()[i])) {
+              dimensionRawColumnChunk.getMinMaxFlagArray()[i]);
+          }
+          if (scanRequired) {
             DimensionColumnPage dimensionColumnPage = dimensionRawColumnChunk.decodeColumnPage(i);
             if (!isDecoded) {
               filterValues =  FilterUtil
@@ -413,7 +426,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     }
     // binary search can only be applied if column is sorted and
     // inverted index exists for that column
-    if (isNaturalSorted) {
+    if (isNaturalSorted && dimensionColumnPage.isExplicitSorted()) {
       int startIndex = 0;
       for (int i = 0; i < filterValues.length; i++) {
         if (startIndex >= numerOfRows) {
@@ -459,8 +472,17 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     if (isDimensionPresentInCurrentBlock) {
       filterValues = dimColumnExecuterInfo.getFilterKeys();
       chunkIndex = dimColumnEvaluatorInfo.getColumnIndexInMinMaxByteArray();
-      isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex], filterValues,
+      // for no dictionary measure column comparison can be done
+      // on the original data as like measure column
+      if (DataTypeUtil
+          .isPrimitiveColumn(dimColumnEvaluatorInfo.getDimension().getDataType())
+          && !dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+        isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex], filterValues,
+            dimColumnEvaluatorInfo.getDimension().getDataType());
+      } else {
+        isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex], filterValues,
           isMinMaxSet[chunkIndex]);
+      }
     } else if (isMeasurePresentInCurrentBlock) {
       chunkIndex = msrColumnEvaluatorInfo.getColumnIndexInMinMaxByteArray();
       isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex],
@@ -501,6 +523,34 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     return isScanRequired;
   }
 
+  private boolean isScanRequired(byte[] blkMaxVal, byte[] blkMinVal, byte[][] filterValues,
+      DataType dataType) {
+    boolean isScanRequired = false;
+    Object minValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(blkMinVal, dataType);
+    Object maxValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(blkMaxVal, dataType);
+    for (int k = 0; k < filterValues.length; k++) {
+      if (ByteUtil.UnsafeComparer.INSTANCE
+          .compareTo(filterValues[k], CarbonCommonConstants.EMPTY_BYTE_ARRAY) == 0) {
+        return true;
+      }
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      Object data =
+          DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(filterValues[k], dataType);
+      SerializableComparator comparator = Comparator.getComparator(dataType);
+      int maxCompare = comparator.compare(data, maxValue);
+      int minCompare = comparator.compare(data, minValue);
+      // if any filter value is in range than this block needs to be
+      // scanned
+      if (maxCompare <= 0 && minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    return isScanRequired;
+  }
+
   private boolean isScanRequired(byte[] maxValue, byte[] minValue, Object[] filterValue,
       DataType dataType) {
     Object maxObject = DataTypeUtil.getMeasureObjectFromDataType(maxValue, dataType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index 7a0a386..b9729db 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -121,13 +121,13 @@ public class RangeValueFilterExecuterImpl implements FilterExecuter {
       CarbonDimension dimension = this.dimColEvaluatorInfo.getDimension();
       byte[] defaultValue = dimension.getDefaultValue();
       if (null != defaultValue) {
-        int maxCompare =
-            ByteUtil.UnsafeComparer.INSTANCE.compareTo(defaultValue, filterRangesValues[0]);
         int minCompare =
-            ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterRangesValues[1], defaultValue);
+            FilterUtil.compareValues(filterRangesValues[0], defaultValue, dimension, true);
+        int maxCompare =
+            FilterUtil.compareValues(filterRangesValues[1], defaultValue, dimension, false);
 
-        if (((greaterThanExp && maxCompare > 0) || (greaterThanEqualExp && maxCompare >= 0))
-            && ((lessThanExp && minCompare > 0) || (lessThanEqualExp && minCompare >= 0))) {
+        if (((greaterThanExp && maxCompare > 0) || (greaterThanEqualExp && maxCompare >= 0)) && (
+            (lessThanExp && minCompare > 0) || (lessThanEqualExp && minCompare >= 0))) {
           isDefaultValuePresentInFilter = true;
         }
       }
@@ -275,30 +275,35 @@ public class RangeValueFilterExecuterImpl implements FilterExecuter {
     // Case D: Filter Values Completely overlaps Block Min and Max then all bits are set.
     //                       Block Min <-----------------------> Block Max
     //         Filter Min <-----------------------------------------------> Filter Max
+    // for no dictionary measure column comparison can be done
+    // on the original data as like measure column
 
     if (isDimensionPresentInCurrentBlock) {
+      CarbonDimension carbonDimension = dimColEvaluatorInfo.getDimension();
       if (((lessThanExp) && (
-          ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockMinValue, filterValues[1]) >= 0)) || (
-          (lessThanEqualExp) && (
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockMinValue, filterValues[1]) > 0)) || (
+          FilterUtil.compareValues(filterValues[1], blockMinValue, carbonDimension, true) >= 0))
+          || ((lessThanEqualExp) && (
+          FilterUtil.compareValues(filterValues[1], blockMinValue, carbonDimension, true) > 0)) || (
           (greaterThanExp) && (
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[0], blockMaxValue) >= 0)) || (
-          (greaterThanEqualExp) && (
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[0], blockMaxValue) > 0))) {
+              FilterUtil.compareValues(filterValues[0], blockMaxValue, carbonDimension, false)
+                  >= 0)) || ((greaterThanEqualExp) && (
+          FilterUtil.compareValues(filterValues[0], blockMaxValue, carbonDimension, false) > 0))) {
         // completely out of block boundary
         isScanRequired = false;
       } else {
         if (((greaterThanExp) && (
-            ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockMinValue, filterValues[0]) > 0)) || (
-            (greaterThanEqualExp) && (
-                ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockMinValue, filterValues[0]) >= 0))) {
+            FilterUtil.compareValues(filterValues[0], blockMinValue, carbonDimension, true) > 0))
+            || ((greaterThanEqualExp) && (
+            FilterUtil.compareValues(filterValues[0], blockMinValue, carbonDimension, true)
+                >= 0))) {
           startBlockMinIsDefaultStart = true;
         }
 
         if (((lessThanExp) && (
-            ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[1], blockMaxValue) > 0)) || (
-            (lessThanEqualExp) && (
-                ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[1], blockMaxValue) >= 0))) {
+            FilterUtil.compareValues(filterValues[1], blockMaxValue, carbonDimension, false) > 0))
+            || ((lessThanEqualExp) && (
+            FilterUtil.compareValues(filterValues[1], blockMaxValue, carbonDimension, false)
+                >= 0))) {
           endBlockMaxisDefaultEnd = true;
         }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
index f901238..c403846 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
@@ -29,9 +29,9 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.filter.ColumnFilterInfo;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
-import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.comparator.Comparator;
 import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
@@ -65,7 +65,7 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
       }
       List<byte[]> noDictionaryFilterValuesList = filterValues.getNoDictionaryFilterValuesList();
       for (byte[] filterValue : noDictionaryFilterValuesList) {
-        int compare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(defaultValue, filterValue);
+        int compare = FilterUtil.compareValues(filterValue, defaultValue, dimension, true);
         if (compare == 0) {
           isDefaultValuePresentInFilterValues = true;
           break;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index d5f4ee2..a5faacc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionColumnPage;
+import org.apache.carbondata.core.datastore.chunk.store.ColumnPageWrapper;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -406,12 +407,11 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
         DimensionColumnPage columnDataChunk =
             blockChunkHolder.getDimensionRawColumnChunks()[dimensionChunkIndex[i]]
                 .decodeColumnPage(pageIndex);
-        if (!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
-            && columnDataChunk instanceof VariableLengthDimensionColumnPage) {
+        if (!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY) && (
+            columnDataChunk instanceof VariableLengthDimensionColumnPage
+                || columnDataChunk instanceof ColumnPageWrapper)) {
 
-          VariableLengthDimensionColumnPage dimensionColumnDataChunk =
-              (VariableLengthDimensionColumnPage) columnDataChunk;
-          byte[] memberBytes = dimensionColumnDataChunk.getChunkData(index);
+          byte[] memberBytes = columnDataChunk.getChunkData(index);
           if (null != memberBytes) {
             if (Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, memberBytes)) {
               memberBytes = null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 31ab42e..63a5976 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -126,8 +126,16 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
             isScanRequired(maxValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
       } else {
         maxValue = blockMaxValue[dimensionChunkIndex[0]];
-        isScanRequired =
-            isScanRequired(maxValue, filterRangeValues, isMinMaxSet[dimensionChunkIndex[0]]);
+        DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
+        // for no dictionary measure column comparison can be done
+        // on the original data as like measure column
+        if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0)
+            .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+          isScanRequired = isScanRequired(maxValue, filterRangeValues, dataType);
+        } else {
+          isScanRequired =
+              isScanRequired(maxValue, filterRangeValues, isMinMaxSet[dimensionChunkIndex[0]]);
+        }
       }
     } else {
       isScanRequired = isDefaultValuePresentInFilter;
@@ -161,6 +169,32 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
     return isScanRequired;
   }
 
+  private boolean isScanRequired(byte[] blockMaxValue, byte[][] filterValues, DataType dataType) {
+    boolean isScanRequired = false;
+    Object maxValue =
+        DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(blockMaxValue, dataType);
+    for (int k = 0; k < filterValues.length; k++) {
+      if (ByteUtil.UnsafeComparer.INSTANCE
+          .compareTo(filterValues[k], CarbonCommonConstants.EMPTY_BYTE_ARRAY) == 0) {
+        return true;
+      }
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      Object data =
+          DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(filterValues[k], dataType);
+      SerializableComparator comparator = Comparator.getComparator(dataType);
+      int maxCompare = comparator.compare(data, maxValue);
+      // if any filter value is in range than this block needs to be
+      // scanned less than equal to max range.
+      if (maxCompare <= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    return isScanRequired;
+  }
+
   private boolean isScanRequired(byte[] maxValue, Object[] filterValue,
       DataType dataType) {
     Object value = DataTypeUtil.getMeasureObjectFromDataType(maxValue, dataType);
@@ -201,8 +235,19 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
       boolean isExclude = false;
       for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
         if (rawColumnChunk.getMaxValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMaxValues()[i],
-              this.filterRangeValues, rawColumnChunk.getMinMaxFlagArray()[i])) {
+          boolean scanRequired;
+          DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
+          // for no dictionary measure column comparison can be done
+          // on the original data as like measure column
+          if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0)
+              .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+            scanRequired =
+                isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues, dataType);
+          } else {
+            scanRequired = isScanRequired(rawColumnChunk.getMaxValues()[i],
+              this.filterRangeValues, rawColumnChunk.getMinMaxFlagArray()[i]);
+          }
+          if (scanRequired) {
             int compare = ByteUtil.UnsafeComparer.INSTANCE
                 .compareTo(filterRangeValues[0], rawColumnChunk.getMinValues()[i]);
             if (compare < 0) {
@@ -423,7 +468,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
     BitSet bitSet = new BitSet(numerOfRows);
     byte[][] filterValues = this.filterRangeValues;
     // binary search can only be applied if column is sorted
-    if (isNaturalSorted) {
+    if (isNaturalSorted && dimensionColumnPage.isExplicitSorted()) {
       int start = 0;
       int last = 0;
       int startIndex = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 8944a51..0f9cfae 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -86,7 +86,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
       if (null != defaultValue) {
         for (int k = 0; k < filterRangeValues.length; k++) {
           int maxCompare =
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterRangeValues[k], defaultValue);
+              FilterUtil.compareValues(filterRangeValues[k], defaultValue, dimension, false);
           if (maxCompare <= 0) {
             isDefaultValuePresentInFilter = true;
             break;
@@ -124,8 +124,16 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
             isScanRequired(maxValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
       } else {
         maxValue = blockMaxValue[dimensionChunkIndex[0]];
-        isScanRequired =
+        DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
+        // for no dictionary measure column comparison can be done
+        // on the original data as like measure column
+        if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0)
+            .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+          isScanRequired = isScanRequired(maxValue, filterRangeValues, dataType);
+        } else {
+          isScanRequired =
             isScanRequired(maxValue, filterRangeValues, isMinMaxSet[dimensionChunkIndex[0]]);
+        }
       }
     } else {
       isScanRequired = isDefaultValuePresentInFilter;
@@ -158,6 +166,32 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
     return isScanRequired;
   }
 
+  private boolean isScanRequired(byte[] blockMaxValue, byte[][] filterValues, DataType dataType) {
+    boolean isScanRequired = false;
+    Object maxValue =
+        DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(blockMaxValue, dataType);
+    for (int k = 0; k < filterValues.length; k++) {
+      if (ByteUtil.UnsafeComparer.INSTANCE
+          .compareTo(filterValues[k], CarbonCommonConstants.EMPTY_BYTE_ARRAY) == 0) {
+        return true;
+      }
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      Object data =
+          DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(filterValues[k], dataType);
+      SerializableComparator comparator = Comparator.getComparator(dataType);
+      int maxCompare = comparator.compare(data, maxValue);
+      // if any filter value is in range than this block needs to be
+      // scanned less than equal to max range.
+      if (maxCompare <= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    return isScanRequired;
+  }
+
   private boolean isScanRequired(byte[] maxValue, Object[] filterValue,
       DataType dataType) {
     Object value = DataTypeUtil.getMeasureObjectFromDataType(maxValue, dataType);
@@ -199,8 +233,19 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
       boolean isExclude = false;
       for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
         if (rawColumnChunk.getMaxValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues,
-              rawColumnChunk.getMinMaxFlagArray()[i])) {
+          boolean scanRequired;
+          DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
+          // for no dictionary measure column comparison can be done
+          // on the original data as like measure column
+          if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0)
+              .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+            scanRequired =
+                isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues, dataType);
+          } else {
+            scanRequired = isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues,
+              rawColumnChunk.getMinMaxFlagArray()[i]);
+          }
+          if (scanRequired) {
             int compare = ByteUtil.UnsafeComparer.INSTANCE
                 .compareTo(filterRangeValues[0], rawColumnChunk.getMinValues()[i]);
             if (compare <= 0) {
@@ -411,7 +456,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
     BitSet bitSet = new BitSet(numerOfRows);
     byte[][] filterValues = this.filterRangeValues;
     // binary search can only be applied if column is sorted
-    if (isNaturalSorted) {
+    if (isNaturalSorted && dimensionColumnPage.isExplicitSorted()) {
       int start = 0;
       int last = 0;
       int startIndex = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 038d50b..eff6509 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -127,8 +127,16 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
             isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
       } else {
         minValue = blockMinValue[dimensionChunkIndex[0]];
-        isScanRequired =
+        DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
+        // for no dictionary measure column comparison can be done
+        // on the original data as like measure column
+        if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0)
+            .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+          isScanRequired = isScanRequired(minValue, filterRangeValues, dataType);
+        } else {
+          isScanRequired =
             isScanRequired(minValue, filterRangeValues, isMinMaxSet[dimensionChunkIndex[0]]);
+        }
       }
     } else {
       isScanRequired = isDefaultValuePresentInFilter;
@@ -160,6 +168,32 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
     return isScanRequired;
   }
 
+  private boolean isScanRequired(byte[] blockMinValue, byte[][] filterValues, DataType dataType) {
+    boolean isScanRequired = false;
+    Object minValue =
+        DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(blockMinValue, dataType);
+    for (int k = 0; k < filterValues.length; k++) {
+      if (ByteUtil.UnsafeComparer.INSTANCE
+          .compareTo(filterValues[k], CarbonCommonConstants.EMPTY_BYTE_ARRAY) == 0) {
+        return true;
+      }
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      Object data =
+          DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(filterValues[k], dataType);
+      SerializableComparator comparator = Comparator.getComparator(dataType);
+      int minCompare = comparator.compare(data, minValue);
+      // if any filter value is in range than this block needs to be
+      // scanned less than equal to max range.
+      if (minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    return isScanRequired;
+  }
+
   private boolean isScanRequired(byte[] minValue, Object[] filterValue,
       DataType dataType) {
     Object value =
@@ -201,8 +235,19 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
       boolean isExclude = false;
       for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
         if (rawColumnChunk.getMinValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues,
-              rawColumnChunk.getMinMaxFlagArray()[i])) {
+          boolean scanRequired;
+          DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
+          // for no dictionary measure column comparison can be done
+          // on the original data as like measure column
+          if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0)
+              .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+            scanRequired =
+                isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues, dataType);
+          } else {
+            scanRequired = isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues,
+              rawColumnChunk.getMinMaxFlagArray()[i]);
+          }
+          if (scanRequired) {
             BitSet bitSet;
             DimensionColumnPage dimensionColumnPage = rawColumnChunk.decodeColumnPage(i);
             if (null != rawColumnChunk.getLocalDictionary()) {
@@ -433,7 +478,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
     BitSet bitSet = new BitSet(numerOfRows);
     byte[][] filterValues = this.filterRangeValues;
     // binary search can only be applied if column is sorted
-    if (isNaturalSorted) {
+    if (isNaturalSorted && dimensionColumnPage.isExplicitSorted()) {
       int start = 0;
       int last = 0;
       int startIndex = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
index fddb4ff..f0773d5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
@@ -127,8 +127,16 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu
             isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
       } else {
         minValue = blockMinValue[dimensionChunkIndex[0]];
-        isScanRequired =
+        DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
+        // for no dictionary measure column comparison can be done
+        // on the original data as like measure column
+        if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0)
+            .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+          isScanRequired = isScanRequired(minValue, filterRangeValues, dataType);
+        } else {
+          isScanRequired =
             isScanRequired(minValue, filterRangeValues, isMinMaxSet[dimensionChunkIndex[0]]);
+        }
       }
     } else {
       isScanRequired = isDefaultValuePresentInFilter;
@@ -161,6 +169,32 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu
     return isScanRequired;
   }
 
+  private boolean isScanRequired(byte[] blockMinValue, byte[][] filterValues, DataType dataType) {
+    boolean isScanRequired = false;
+    Object minValue =
+        DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(blockMinValue, dataType);
+    for (int k = 0; k < filterValues.length; k++) {
+      if (ByteUtil.UnsafeComparer.INSTANCE
+          .compareTo(filterValues[k], CarbonCommonConstants.EMPTY_BYTE_ARRAY) == 0) {
+        return true;
+      }
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      Object data =
+          DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(filterValues[k], dataType);
+      SerializableComparator comparator = Comparator.getComparator(dataType);
+      int minCompare = comparator.compare(data, minValue);
+      // if any filter value is in range than this block needs to be
+      // scanned less than equal to max range.
+      if (minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    return isScanRequired;
+  }
+
   private boolean isScanRequired(byte[] minValue, Object[] filterValue,
       DataType dataType) {
     Object value = DataTypeUtil.getMeasureObjectFromDataType(minValue, dataType);
@@ -201,8 +235,19 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu
       boolean isExclude = false;
       for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
         if (rawColumnChunk.getMinValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues,
-              rawColumnChunk.getMinMaxFlagArray()[i])) {
+          boolean scanRequired;
+          DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
+          // for no dictionary measure column comparison can be done
+          // on the original data as like measure column
+          if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0)
+              .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+            scanRequired =
+                isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues, dataType);
+          } else {
+            scanRequired = isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues,
+              rawColumnChunk.getMinMaxFlagArray()[i]);
+          }
+          if (scanRequired) {
             BitSet bitSet;
             DimensionColumnPage dimensionColumnPage = rawColumnChunk.decodeColumnPage(i);
             if (null != rawColumnChunk.getLocalDictionary()) {
@@ -441,7 +486,7 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu
     BitSet bitSet = new BitSet(numerOfRows);
     byte[][] filterValues = this.filterRangeValues;
     // binary search can only be applied if column is sorted
-    if (isNaturalSorted) {
+    if (isNaturalSorted && dimensionColumnPage.isExplicitSorted()) {
       int start = 0;
       int last = 0;
       int startIndex = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
index 2bc73c5..4a713d5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
@@ -210,7 +210,7 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
           continue;
         }
         filterValuesList.add(DataTypeUtil.getMeasureValueBasedOnDataType(result.getString(),
-            result.getDataType(), carbonMeasure));
+            result.getDataType(), carbonMeasure.getScale(), carbonMeasure.getPrecision()));
       } catch (FilterIllegalMemberException e) {
         // Any invalid member while evaluation shall be ignored, system will log the
         // error only once since all rows the evaluation happens so inorder to avoid

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index daf9a91..0f9ba22 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -240,10 +240,9 @@ public abstract class BlockletScannedResult {
    * Fill the column data to vector
    */
   public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
-    int column = 0;
     for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
-      column = dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter]
-          .fillVector(vectorInfo, column);
+      dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter]
+          .fillVector(vectorInfo, i);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index 26b1135..1b83110 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -110,12 +110,9 @@ public class FilterQueryScannedResult extends BlockletScannedResult {
    * Fill the column data to vector
    */
   public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
-    int column = 0;
-    for (int chunkIndex : this.noDictionaryColumnChunkIndexes) {
-      column = dimensionColumnPages[chunkIndex][pageCounter].fillVector(
-          pageFilteredRowId[pageCounter],
-          vectorInfo,
-          column);
+    for (int index = 0; index < this.noDictionaryColumnChunkIndexes.length; index++) {
+      dimensionColumnPages[noDictionaryColumnChunkIndexes[index]][pageCounter]
+          .fillVector(pageFilteredRowId[pageCounter], vectorInfo, index);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
new file mode 100644
index 0000000..00e7dee
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+public class CarbonUnsafeUtil {
+
+  /**
+   * Put the data to unsafe memory
+   *
+   * @param dataType
+   * @param data
+   * @param baseObject
+   * @param address
+   * @param size
+   * @param sizeInBytes
+   */
+  public static void putDataToUnsafe(DataType dataType, Object data, Object baseObject,
+      long address, int size, int sizeInBytes) {
+    dataType = DataTypeUtil.valueOf(dataType.getName());
+    if (dataType == DataTypes.BOOLEAN) {
+      CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, (boolean) data);
+    } else if (dataType == DataTypes.BYTE) {
+      CarbonUnsafe.getUnsafe().putByte(baseObject, address + size, (byte) data);
+    } else if (dataType == DataTypes.SHORT) {
+      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) data);
+    } else if (dataType == DataTypes.INT) {
+      CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, (int) data);
+    } else if (dataType == DataTypes.LONG) {
+      CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, (long) data);
+    } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.DOUBLE) {
+      CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, (double) data);
+    } else if (dataType == DataTypes.FLOAT) {
+      CarbonUnsafe.getUnsafe().putFloat(baseObject, address + size, (float) data);
+    } else if (dataType == DataTypes.BYTE_ARRAY) {
+      CarbonUnsafe.getUnsafe()
+          .copyMemory(data, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
+              sizeInBytes);
+    }
+  }
+
+  /**
+   * Retrieve/Get the data from unsafe memory
+   *
+   * @param dataType
+   * @param baseObject
+   * @param address
+   * @param size
+   * @param sizeInBytes
+   * @return
+   */
+  public static Object getDataFromUnsafe(DataType dataType, Object baseObject, long address,
+      int size, int sizeInBytes) {
+    dataType = DataTypeUtil.valueOf(dataType.getName());
+    Object data = new Object();
+    if (dataType == DataTypes.BOOLEAN) {
+      data = CarbonUnsafe.getUnsafe().getBoolean(baseObject, address + size);
+    } else if (dataType == DataTypes.BYTE) {
+      data = CarbonUnsafe.getUnsafe().getByte(baseObject, address + size);
+    } else if (dataType == DataTypes.SHORT) {
+      data = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+    } else if (dataType == DataTypes.INT) {
+      data = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+    } else if (dataType == DataTypes.LONG) {
+      data = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+    } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.DOUBLE) {
+      data = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+    } else if (dataType == DataTypes.FLOAT) {
+      data = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+    } else if (dataType == DataTypes.BYTE_ARRAY) {
+      CarbonUnsafe.getUnsafe()
+          .copyMemory(baseObject, address + size, data, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+              sizeInBytes);
+    }
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index dc03944..9ab875c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3243,6 +3243,28 @@ public final class CarbonUtil {
   }
 
   /**
+   * Check if the page is adaptive encoded
+   *
+   * @param encodings
+   * @return
+   */
+  public static boolean isEncodedWithMeta(List<org.apache.carbondata.format.Encoding> encodings) {
+    if (encodings != null && !encodings.isEmpty()) {
+      org.apache.carbondata.format.Encoding encoding = encodings.get(0);
+      switch (encoding) {
+        case DIRECT_COMPRESS:
+        case DIRECT_STRING:
+        case ADAPTIVE_INTEGRAL:
+        case ADAPTIVE_DELTA_INTEGRAL:
+        case ADAPTIVE_FLOATING:
+        case ADAPTIVE_DELTA_FLOATING:
+          return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Check whether it is standard table means tablepath has Fact/Part0/Segment_ tail present with
    * all carbon files. In other cases carbon files present directly under tablepath or
    * tablepath/partition folder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index fa08df9..612e17c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -82,8 +82,8 @@ public final class DataTypeUtil {
    * @return
    */
   public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
-      CarbonMeasure carbonMeasure) {
-    return getMeasureValueBasedOnDataType(msrValue, dataType,carbonMeasure, false);
+      int scale, int precision) {
+    return getMeasureValueBasedOnDataType(msrValue, dataType, scale, precision, false);
   }
 
   /**
@@ -95,13 +95,13 @@ public final class DataTypeUtil {
    * @return
    */
   public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
-      CarbonMeasure carbonMeasure, boolean useConverter) {
+      int scale, int precision, boolean useConverter) {
     if (dataType == DataTypes.BOOLEAN) {
       return BooleanConvert.parseBoolean(msrValue);
     } else if (DataTypes.isDecimal(dataType)) {
       BigDecimal bigDecimal =
-          new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
-      BigDecimal decimal = normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision());
+          new BigDecimal(msrValue).setScale(scale, RoundingMode.HALF_UP);
+      BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
       if (useConverter) {
         return converter.convertFromBigDecimalToDecimal(decimal);
       } else {
@@ -415,6 +415,38 @@ public final class DataTypeUtil {
   }
 
   /**
+   * Convert the min/max values to bytes for no dictionary column
+   *
+   * @param dimensionValue
+   * @param actualDataType
+   * @return
+   */
+  public static byte[] getMinMaxBytesBasedOnDataTypeForNoDictionaryColumn(Object dimensionValue,
+      DataType actualDataType) {
+    if (dimensionValue == null) {
+      if (actualDataType == DataTypes.STRING) {
+        return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      } else {
+        return new byte[0];
+      }
+    }
+    if (actualDataType == DataTypes.BOOLEAN) {
+      return ByteUtil.toBytes(Boolean.valueOf(ByteUtil.toBoolean((byte) dimensionValue)));
+    } else if (actualDataType == DataTypes.SHORT) {
+      return ByteUtil.toXorBytes((Short) dimensionValue);
+    } else if (actualDataType == DataTypes.INT) {
+      return ByteUtil.toXorBytes((Integer) dimensionValue);
+    } else if (actualDataType == DataTypes.LONG) {
+      return ByteUtil.toXorBytes((Long) dimensionValue);
+    } else if (actualDataType == DataTypes.TIMESTAMP) {
+      return ByteUtil.toXorBytes((Long)dimensionValue);
+    } else {
+      // Default action for String/Varchar
+      return ByteUtil.toBytes(dimensionValue.toString());
+    }
+  }
+
+  /**
    * Returns true for fixed length DataTypes.
    * @param dataType
    * @return
@@ -976,4 +1008,20 @@ public final class DataTypeUtil {
     return value;
   }
 
+  /**
+   * Check if the column is a no dictionary primitive column
+   *
+   * @param dataType
+   * @return
+   */
+  public static boolean isPrimitiveColumn(DataType dataType) {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE || dataType == DataTypes.SHORT
+        || dataType == DataTypes.INT || dataType == DataTypes.LONG || DataTypes.isDecimal(dataType)
+        || dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE
+        || dataType == DataTypes.BYTE_ARRAY) {
+      return true;
+    }
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
index fca1244..a51d6d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
@@ -116,7 +116,7 @@ public class NonDictionaryUtil {
     return measures[index];
   }
 
-  public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr,
+  public static void prepareOutObj(Object[] out, int[] dimArray, Object[] byteBufferArr,
       Object[] measureArray) {
     out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
     out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java
index 0b8bcc7..3f1c819 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
 import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
@@ -38,25 +39,25 @@ public class TestEncodingFactory extends TestCase {
     // for Byte
     primitivePageStatsCollector.update((long) Byte.MAX_VALUE);
     ColumnPageCodec columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false, null);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.BYTE == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for Short
     primitivePageStatsCollector.update((long) Short.MAX_VALUE);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false, null);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.SHORT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for int
     primitivePageStatsCollector.update((long) Integer.MAX_VALUE);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false, null);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.INT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for long
     primitivePageStatsCollector.update(Long.MAX_VALUE);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false, null);
     assert (columnPageCodec instanceof DirectCompressCodec);
     assert ("DirectCompressCodec".equals(columnPageCodec.getName()));
   }
@@ -67,25 +68,25 @@ public class TestEncodingFactory extends TestCase {
     // for Byte
     primitivePageStatsCollector.update((long) 200);
     ColumnPageCodec columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false, null);
     assert (columnPageCodec instanceof AdaptiveDeltaIntegralCodec);
     assert (DataTypes.BYTE == ((AdaptiveDeltaIntegralCodec) columnPageCodec).getTargetDataType());
     // for Short
     primitivePageStatsCollector.update((long) 634767);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false, null);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.SHORT_INT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for int
     primitivePageStatsCollector.update((long) (Integer.MAX_VALUE + 200));
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false, null);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.INT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for int
     primitivePageStatsCollector.update(Long.MAX_VALUE);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false, null);
     assert (columnPageCodec instanceof DirectCompressCodec);
     assert ("DirectCompressCodec".equals(columnPageCodec.getName()));
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
index 720e954..e69fa9e 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
@@ -62,13 +62,13 @@ public class DataTypeUtilTest {
   @Test public void testGetMeasureValueBasedOnDataType() {
     ColumnSchema columnSchema = new ColumnSchema();
     CarbonMeasure carbonMeasure = new CarbonMeasure(columnSchema, 1);
-    Object resultInt = getMeasureValueBasedOnDataType("1", DataTypes.INT, carbonMeasure);
+    Object resultInt = getMeasureValueBasedOnDataType("1", DataTypes.INT, carbonMeasure.getScale(), carbonMeasure.getPrecision());
     Object expectedInt = Double.valueOf(1).intValue();
     assertEquals(expectedInt, resultInt);
-    Object resultLong = getMeasureValueBasedOnDataType("1", DataTypes.LONG, carbonMeasure);
+    Object resultLong = getMeasureValueBasedOnDataType("1", DataTypes.LONG, carbonMeasure.getScale(), carbonMeasure.getPrecision());
     Object expectedLong = Long.valueOf(1);
     assertEquals(expectedLong, resultLong);
-    Object resultDefault = getMeasureValueBasedOnDataType("1", DataTypes.DOUBLE, carbonMeasure);
+    Object resultDefault = getMeasureValueBasedOnDataType("1", DataTypes.DOUBLE, carbonMeasure.getScale(), carbonMeasure.getPrecision());
     Double expectedDefault = Double.valueOf(1);
     assertEquals(expectedDefault, resultDefault);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java b/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
index 3fdce4e..ee74bf2 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
@@ -17,10 +17,13 @@
 
 package org.apache.carbondata.core.util;
 
+import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -37,9 +40,14 @@ import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import org.apache.carbondata.core.scan.filter.executer.RangeValueFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
 import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 
 import mockit.Deencapsulation;
+import mockit.Mock;
 import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -47,7 +55,15 @@ import org.junit.Test;
 
 /* Test Cases for Range Filter */
 public class RangeFilterProcessorTest {
+
+  public static DimColumnResolvedFilterInfo dimColumnResolvedFilterInfo =
+      new DimColumnResolvedFilterInfo();
+
   @BeforeClass public static void setUp() throws Exception {
+    ColumnSchema columnSchema = new ColumnSchema();
+    columnSchema.setDataType(DataTypes.STRING);
+    CarbonDimension carbonDimension = new CarbonDimension(columnSchema, 0, 0, 0);
+    dimColumnResolvedFilterInfo.setDimension(carbonDimension);
   }
 
   public boolean checkBothTrees(Expression a, Expression b) {
@@ -320,6 +336,8 @@ public class RangeFilterProcessorTest {
     Deencapsulation.setField(range, "isDimensionPresentInCurrentBlock", true);
     Deencapsulation.setField(range, "lessThanExp", true);
     Deencapsulation.setField(range, "greaterThanExp", true);
+    Deencapsulation.setField(range, "dimColEvaluatorInfo", dimColumnResolvedFilterInfo);
+
     result = range.isScanRequired(BlockMin, BlockMax, filterMinMax, true);
     Assert.assertFalse(result);
   }
@@ -336,6 +354,7 @@ public class RangeFilterProcessorTest {
     Deencapsulation.setField(range, "isDimensionPresentInCurrentBlock", true);
     Deencapsulation.setField(range, "lessThanExp", true);
     Deencapsulation.setField(range, "greaterThanExp", true);
+    Deencapsulation.setField(range, "dimColEvaluatorInfo", dimColumnResolvedFilterInfo);
     result = range.isScanRequired(BlockMin, BlockMax, filterMinMax, true);
     Assert.assertFalse(result);
   }
@@ -352,6 +371,7 @@ public class RangeFilterProcessorTest {
     Deencapsulation.setField(range, "isDimensionPresentInCurrentBlock", true);
     Deencapsulation.setField(range, "lessThanExp", true);
     Deencapsulation.setField(range, "greaterThanExp", true);
+    Deencapsulation.setField(range, "dimColEvaluatorInfo", dimColumnResolvedFilterInfo);
     result = range.isScanRequired(BlockMin, BlockMax, filterMinMax, true);
     Assert.assertTrue(result);
   }
@@ -369,6 +389,7 @@ public class RangeFilterProcessorTest {
     Deencapsulation.setField(range, "isDimensionPresentInCurrentBlock", true);
     Deencapsulation.setField(range, "lessThanExp", true);
     Deencapsulation.setField(range, "greaterThanExp", true);
+    Deencapsulation.setField(range, "dimColEvaluatorInfo", dimColumnResolvedFilterInfo);
 
     result = range.isScanRequired(BlockMin, BlockMax, filterMinMax, true);
     rangeCovered = Deencapsulation.getField(range, "isRangeFullyCoverBlock");
@@ -389,6 +410,7 @@ public class RangeFilterProcessorTest {
     Deencapsulation.setField(range, "isDimensionPresentInCurrentBlock", true);
     Deencapsulation.setField(range, "lessThanExp", true);
     Deencapsulation.setField(range, "greaterThanExp", true);
+    Deencapsulation.setField(range, "dimColEvaluatorInfo", dimColumnResolvedFilterInfo);
 
     result = range.isScanRequired(BlockMin, BlockMax, filterMinMax, true);
     startBlockMinIsDefaultStart = Deencapsulation.getField(range, "startBlockMinIsDefaultStart");
@@ -409,6 +431,7 @@ public class RangeFilterProcessorTest {
     Deencapsulation.setField(range, "isDimensionPresentInCurrentBlock", true);
     Deencapsulation.setField(range, "lessThanExp", true);
     Deencapsulation.setField(range, "greaterThanExp", true);
+    Deencapsulation.setField(range, "dimColEvaluatorInfo", dimColumnResolvedFilterInfo);
 
     result = range.isScanRequired(BlockMin, BlockMax, filterMinMax, true);
     endBlockMaxisDefaultEnd = Deencapsulation.getField(range, "endBlockMaxisDefaultEnd");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
index 3823460..4734abd 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
@@ -144,7 +144,7 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter {
           || indexColumns.get(indexColIdx).hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         indexValue = convertDictionaryValue(indexColIdx, value);
       } else {
-        indexValue = convertNonDictionaryValue(indexColIdx, (byte[]) value);
+        indexValue = convertNonDictionaryValue(indexColIdx, value);
       }
     }
     if (indexValue.length == 0) {
@@ -155,7 +155,7 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter {
 
   protected abstract byte[] convertDictionaryValue(int indexColIdx, Object value);
 
-  protected abstract byte[] convertNonDictionaryValue(int indexColIdx, byte[] value);
+  protected abstract byte[] convertNonDictionaryValue(int indexColIdx, Object value);
 
   private void initDataMapFile() throws IOException {
     if (!FileFactory.isFileExist(dataMapPath)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 27911ca..2e2d94b 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -62,6 +62,7 @@ import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.converter.FieldConverter;
@@ -343,8 +344,18 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
       // for dictionary/date columns, convert the surrogate key to bytes
       internalFilterValue = CarbonUtil.getValueAsBytes(DataTypes.INT, convertedValue);
     } else {
-      // for non dictionary dimensions, is already bytes,
-      internalFilterValue = (byte[]) convertedValue;
+      // for non dictionary dimensions, numeric columns will be of original data,
+      // so convert the data to bytes
+      if (DataTypeUtil.isPrimitiveColumn(carbonColumn.getDataType())) {
+        if (convertedValue == null) {
+          convertedValue = DataConvertUtil.getNullValueForMeasure(carbonColumn.getDataType(),
+              carbonColumn.getColumnSchema().getScale());
+        }
+        internalFilterValue =
+            CarbonUtil.getValueAsBytes(carbonColumn.getDataType(), convertedValue);
+      } else {
+        internalFilterValue = (byte[]) convertedValue;
+      }
     }
     if (internalFilterValue.length == 0) {
       internalFilterValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;


[4/4] carbondata git commit: [CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types

Posted by ma...@apache.org.
[CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types

Loading configurations and settings
(1) Parse data as like that of measure, so change in FieldEncoderFactory to take up measure flow
(2) While creating loading configurations, no dictionary, sort columns should be taken care in all the needed flows

Sort rows preparation
(1) Prepare the row to be sorted with original data for no dictionary columns
(2) Use data type based comparators for the no dictionary sort columns in all the flows like Intermediate Sort, Final sort, Unsafe sort
(3) Handle read write of row with no dictionary primitive data types to intermediate files and in the final file merger, as we will be reading and writing as original data
(4) Get the no dictionary sort data types from the load configurations what we set in LOAD step

Adding to Column page and apply adaptive encoding
(1) Add the no dictionary primitive datatypes data as original data
(2) Apply adaptive encoding to the page
(3) Reuse the adaptive encoding techniques existing for measure column

Writing inverted index to adaptive encoded page
(1) Prepare in the inverted inverted list based on the datatype based comparison
(2) Apply RLE on the inverted index
(3) Write the inverted index to the encoded page

Create decoder while querying
(1) Create proper decoder for the no dictionary column pages
(2) Uncompress the column page and also the inverted index

Filter flow changes
(1) FilterValues will be in bytes, so convert the data to bytes for comparison
(2) Change the isScanRequired to compare min/max values based on the data type

Fill output row in case of queries
(1) Change the noDictionaryKeys to Object, now it can be datatypes based data for no dictionary primitive data types

Bloom filter changes
(1) Change bloom filter load
(2) While rebuilding the data map, the load expects the data to original data. Therefore a conversion is used
(3) Fill the no dictionary primitive data as original data

Compaction Changes
Compaction will get the rows from the result collectors. But the result collectors will give bytes as no dictionary columns.
So a conversion is needed to convert the bytes to original data based on the data type.

This closes #2654


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c8f70630
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c8f70630
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c8f70630

Branch: refs/heads/master
Commit: c8f7063048115d161de539cf277cc1ccb015159b
Parents: 61fcdf2
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Aug 22 12:45:44 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Sep 18 19:12:56 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datastore/TableSpec.java    |  17 +
 ...mpressedDimensionChunkFileBasedReaderV3.java |  42 +-
 .../chunk/store/ColumnPageWrapper.java          | 106 ++++-
 ...feVariableLengthDimensionDataChunkStore.java |  28 +-
 .../datastore/columnar/BlockIndexerStorage.java | 104 +++++
 .../BlockIndexerStorageForNoDictionary.java     | 116 ++++++
 ...ndexerStorageForNoInvertedIndexForShort.java |  17 +-
 .../columnar/BlockIndexerStorageForShort.java   |  71 +---
 .../ColumnWithRowIdForNoDictionary.java         |  72 ++++
 .../core/datastore/columnar/IndexStorage.java   |  35 --
 .../page/encoding/ColumnPageEncoder.java        |  48 ++-
 .../page/encoding/DefaultEncodingFactory.java   |  70 +++-
 .../page/encoding/EncodingFactory.java          |  18 +-
 .../page/encoding/adaptive/AdaptiveCodec.java   | 195 ++++++++-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |  31 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |  30 +-
 .../adaptive/AdaptiveFloatingCodec.java         |  30 +-
 .../adaptive/AdaptiveIntegralCodec.java         |  30 +-
 .../legacy/ComplexDimensionIndexCodec.java      |   4 +-
 .../legacy/DictDimensionIndexCodec.java         |   4 +-
 .../legacy/DirectDictDimensionIndexCodec.java   |   4 +-
 .../legacy/HighCardDictDimensionIndexCodec.java |   4 +-
 .../dimension/legacy/IndexStorageEncoder.java   |   8 +-
 .../core/datastore/page/key/TablePageKey.java   |   3 +-
 .../page/statistics/TablePageStatistics.java    |  14 +-
 .../core/datastore/row/WriteStepRowUtil.java    |  28 +-
 .../core/scan/executor/util/QueryUtil.java      |  36 ++
 .../carbondata/core/scan/filter/FilterUtil.java |  42 +-
 .../executer/ExcludeFilterExecuterImpl.java     |   2 +-
 .../executer/IncludeFilterExecuterImpl.java     |  58 ++-
 .../executer/RangeValueFilterExecuterImpl.java  |  39 +-
 .../executer/RestructureEvaluatorImpl.java      |   4 +-
 .../executer/RowLevelFilterExecuterImpl.java    |  10 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  55 ++-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  55 ++-
 ...velRangeLessThanEqualFilterExecuterImpl.java |  53 ++-
 ...RowLevelRangeLessThanFilterExecuterImpl.java |  53 ++-
 .../RowLevelRangeFilterResolverImpl.java        |   2 +-
 .../core/scan/result/BlockletScannedResult.java |   5 +-
 .../result/impl/FilterQueryScannedResult.java   |   9 +-
 .../carbondata/core/util/CarbonUnsafeUtil.java  |  95 +++++
 .../apache/carbondata/core/util/CarbonUtil.java |  22 +
 .../carbondata/core/util/DataTypeUtil.java      |  58 ++-
 .../carbondata/core/util/NonDictionaryUtil.java |   2 +-
 .../page/encoding/TestEncodingFactory.java      |  17 +-
 .../carbondata/core/util/DataTypeUtilTest.java  |   6 +-
 .../core/util/RangeFilterProcessorTest.java     |  23 ++
 .../bloom/AbstractBloomDataMapWriter.java       |   4 +-
 .../datamap/bloom/BloomCoarseGrainDataMap.java  |  15 +-
 .../datamap/bloom/BloomDataMapBuilder.java      |   9 +-
 .../datamap/bloom/BloomDataMapWriter.java       |  10 +-
 .../hadoop/testutil/StoreCreator.java           |  39 +-
 .../load/DataLoadProcessBuilderOnSpark.scala    |   4 +-
 .../datasource/SparkCarbonDataSourceTest.scala  |   2 +-
 .../datamap/IndexDataMapRebuildRDD.scala        |  13 +-
 .../CarbonGetTableDetailComandTestCase.scala    |   6 +-
 .../loading/CarbonDataLoadConfiguration.java    |  40 ++
 .../converter/impl/FieldEncoderFactory.java     |   6 +
 .../impl/MeasureFieldConverterImpl.java         |  40 +-
 .../partition/impl/RawRowComparator.java        |  30 +-
 .../loading/row/IntermediateSortTempRow.java    |   8 +-
 .../loading/sort/SortStepRowHandler.java        | 411 ++++++++++++++-----
 .../unsafe/comparator/UnsafeRowComparator.java  |  58 ++-
 .../holder/UnsafeFinalMergePageHolder.java      |   7 +-
 .../unsafe/holder/UnsafeInmemoryHolder.java     |   3 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |   3 +-
 .../CarbonRowDataWriterProcessorStepImpl.java   |   6 +-
 .../steps/DataConverterProcessorStepImpl.java   |   9 +-
 .../InputProcessorStepWithNoConverterImpl.java  |   9 +-
 .../merger/CompactionResultSortProcessor.java   |  30 +-
 .../merger/RowResultMergerProcessor.java        |   6 +-
 .../partition/spliter/RowResultProcessor.java   |   7 +-
 .../IntermediateSortTempRowComparator.java      |  34 +-
 .../sort/sortdata/NewRowComparator.java         |  48 ++-
 .../processing/sort/sortdata/SortDataRows.java  |   7 +-
 .../sort/sortdata/SortParameters.java           |  94 ++++-
 .../sort/sortdata/SortTempFileChunkHolder.java  |   2 +-
 .../sort/sortdata/TableFieldStat.java           |  34 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  13 +-
 .../store/CarbonFactDataHandlerModel.java       |  25 ++
 .../carbondata/processing/store/TablePage.java  |  62 ++-
 .../util/CarbonDataProcessorUtil.java           |  80 +++-
 82 files changed, 2328 insertions(+), 621 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index bded430..a26d6ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -104,6 +104,23 @@ public class TableSpec {
     }
   }
 
+  /**
+   * No dictionary and complex dimensions of the table
+   *
+   * @return
+   */
+  public DimensionSpec[] getNoDictAndComplexDimensions() {
+    List<DimensionSpec> noDictAndComplexDimensions = new ArrayList<>();
+    for (int i = 0; i < dimensionSpec.length; i++) {
+      if (dimensionSpec[i].getColumnType() == ColumnType.PLAIN_VALUE
+          || dimensionSpec[i].getColumnType() == ColumnType.COMPLEX_PRIMITIVE
+          || dimensionSpec[i].getColumnType() == ColumnType.COMPLEX) {
+        noDictAndComplexDimensions.add(dimensionSpec[i]);
+      }
+    }
+    return noDictAndComplexDimensions.toArray(new DimensionSpec[noDictAndComplexDimensions.size()]);
+  }
+
   public DimensionSpec getDimensionSpec(int dimensionIndex) {
     return dimensionSpec[dimensionIndex];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 486cc2d..b96e52e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -237,41 +237,39 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
         .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
   }
 
-  private boolean isEncodedWithMeta(DataChunk2 pageMetadata) {
-    List<Encoding> encodings = pageMetadata.getEncoders();
-    if (encodings != null && encodings.size() == 1) {
-      Encoding encoding = encodings.get(0);
-      switch (encoding) {
-        case DIRECT_COMPRESS:
-        case DIRECT_STRING:
-        case ADAPTIVE_INTEGRAL:
-        case ADAPTIVE_DELTA_INTEGRAL:
-        case ADAPTIVE_FLOATING:
-        case ADAPTIVE_DELTA_FLOATING:
-          return true;
-      }
-    }
-    return false;
-  }
-
   protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
       ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
       throws IOException, MemoryException {
-    if (isEncodedWithMeta(pageMetadata)) {
+    List<Encoding> encodings = pageMetadata.getEncoders();
+    if (CarbonUtil.isEncodedWithMeta(encodings)) {
       ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
           null != rawColumnPage.getLocalDictionary());
       decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
-      return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(),
-          isEncodedWithAdaptiveMeta(pageMetadata));
+      int[] invertedIndexes = new int[0];
+      int[] invertedIndexesReverse = new int[0];
+      // in case of no dictionary measure data types, if it is included in sort columns
+      // then inverted index to be uncompressed
+      if (encodings.contains(Encoding.INVERTED_INDEX)) {
+        offset += pageMetadata.data_page_length;
+        if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
+          invertedIndexes = CarbonUtil
+              .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
+          // get the reverse index
+          invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+        }
+      }
+      return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
+          invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata),
+          CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX));
     } else {
       // following code is for backward compatibility
       return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
     }
   }
 
-  private boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
+  public boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
     List<Encoding> encodings = pageMetadata.getEncoders();
-    if (encodings != null && encodings.size() == 1) {
+    if (encodings != null && !encodings.isEmpty()) {
       Encoding encoding = encodings.get(0);
       switch (encoding) {
         case ADAPTIVE_INTEGRAL:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
index 65991a5..176a3e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -24,10 +24,13 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 
 public class ColumnPageWrapper implements DimensionColumnPage {
@@ -36,14 +39,23 @@ public class ColumnPageWrapper implements DimensionColumnPage {
 
   private CarbonDictionary localDictionary;
 
-  private boolean isAdaptiveComplexPrimitivePage;
+  private boolean isAdaptivePrimitivePage;
+
+  private int[] invertedIndex;
+
+  private int[] invertedReverseIndex;
+
+  private boolean isExplicitSorted;
 
   public ColumnPageWrapper(ColumnPage columnPage, CarbonDictionary localDictionary,
-      boolean isAdaptiveComplexPrimitivePage) {
+      int[] invertedIndex, int[] invertedReverseIndex, boolean isAdaptivePrimitivePage,
+      boolean isExplicitSorted) {
     this.columnPage = columnPage;
     this.localDictionary = localDictionary;
-    this.isAdaptiveComplexPrimitivePage = isAdaptiveComplexPrimitivePage;
-
+    this.invertedIndex = invertedIndex;
+    this.invertedReverseIndex = invertedReverseIndex;
+    this.isAdaptivePrimitivePage = isAdaptivePrimitivePage;
+    this.isExplicitSorted = isExplicitSorted;
   }
 
   @Override
@@ -58,26 +70,79 @@ public class ColumnPageWrapper implements DimensionColumnPage {
 
   @Override
   public int fillVector(ColumnVectorInfo[] vectorInfo, int chunkIndex) {
-    throw new UnsupportedOperationException("internal error");
+    ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    for (int i = offset; i < len; i++) {
+      fillRow(i, vector, vectorOffset++);
+    }
+    return chunkIndex + 1;
+  }
+
+  /**
+   * Fill the data to the vector
+   *
+   * @param rowId
+   * @param vector
+   * @param vectorRow
+   */
+  private void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
+    if (columnPage.getNullBits().get(rowId)
+        && columnPage.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+      // if this row is null, return default null represent in byte array
+      byte[] value = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      QueryUtil.putDataToVector(vector, value, vectorRow, value.length);
+    } else if (columnPage.getNullBits().get(rowId)) {
+      // if this row is null, return default null represent in byte array
+      byte[] value = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      QueryUtil.putDataToVector(vector, value, vectorRow, value.length);
+    } else {
+      if (isExplicitSorted) {
+        rowId = invertedReverseIndex[rowId];
+      }
+      byte[] value = getChunkData(rowId, true);
+      int length = value.length;
+      QueryUtil.putDataToVector(vector, value, vectorRow, length);
+    }
   }
 
   @Override
   public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int chunkIndex) {
-    throw new UnsupportedOperationException("internal error");
+    ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    for (int i = offset; i < len; i++) {
+      fillRow(filteredRowId[i], vector, vectorOffset++);
+    }
+    return chunkIndex + 1;
   }
 
   @Override public byte[] getChunkData(int rowId) {
+    return getChunkData(rowId, false);
+  }
+
+  private byte[] getChunkData(int rowId, boolean isRowIdChanged) {
     ColumnType columnType = columnPage.getColumnSpec().getColumnType();
     DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType();
     DataType targetDataType = columnPage.getDataType();
     if (null != localDictionary) {
       return localDictionary
           .getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 0, 3));
-    } else if (columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptiveComplexPrimitive()) {
-      if (columnPage.getNullBits().get(rowId)) {
+    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptivePrimitive()) || (
+        columnType == ColumnType.PLAIN_VALUE && DataTypeUtil.isPrimitiveColumn(srcDataType))) {
+      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)
+          && columnType == ColumnType.COMPLEX_PRIMITIVE) {
         // if this row is null, return default null represent in byte array
         return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
       }
+      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
+        // if this row is null, return default null represent in byte array
+        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      }
       if (srcDataType == DataTypes.DOUBLE || srcDataType == DataTypes.FLOAT) {
         double doubleData = columnPage.getDouble(rowId);
         if (srcDataType == DataTypes.FLOAT) {
@@ -118,15 +183,20 @@ public class ColumnPageWrapper implements DimensionColumnPage {
       } else {
         throw new RuntimeException("unsupported type: " + targetDataType);
       }
-    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE) && !this.isAdaptiveComplexPrimitive()) {
+    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && !this.isAdaptivePrimitive())) {
+      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
+        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      }
       if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN)) {
         byte[] out = new byte[1];
         out[0] = (columnPage.getByte(rowId));
-        return out;
+        return ByteUtil.toBytes(ByteUtil.toBoolean(out));
       } else if (srcDataType == DataTypes.BYTE_ARRAY) {
         return columnPage.getBytes(rowId);
-      }  else if (srcDataType == DataTypes.DOUBLE) {
+      } else if (srcDataType == DataTypes.DOUBLE) {
         return ByteUtil.toXorBytes(columnPage.getDouble(rowId));
+      } else if (srcDataType == targetDataType) {
+        return columnPage.getBytes(rowId);
       } else {
         throw new RuntimeException("unsupported type: " + targetDataType);
       }
@@ -135,15 +205,14 @@ public class ColumnPageWrapper implements DimensionColumnPage {
     }
   }
 
-
   @Override
   public int getInvertedIndex(int rowId) {
-    throw new UnsupportedOperationException("internal error");
+    return invertedIndex[rowId];
   }
 
   @Override
   public int getInvertedReverseIndex(int rowId) {
-    throw new UnsupportedOperationException("internal error");
+    return invertedReverseIndex[rowId];
   }
 
   @Override
@@ -153,12 +222,13 @@ public class ColumnPageWrapper implements DimensionColumnPage {
 
   @Override
   public boolean isExplicitSorted() {
-    return false;
+    return isExplicitSorted;
   }
 
   @Override
   public int compareTo(int rowId, byte[] compareValue) {
-    throw new UnsupportedOperationException("internal error");
+    byte[] chunkData = this.getChunkData((int) rowId);
+    return ByteUtil.UnsafeComparer.INSTANCE.compareTo(chunkData, compareValue);
   }
 
   @Override
@@ -169,8 +239,8 @@ public class ColumnPageWrapper implements DimensionColumnPage {
     }
   }
 
-  public boolean isAdaptiveComplexPrimitive() {
-    return isAdaptiveComplexPrimitivePage;
+  public boolean isAdaptivePrimitive() {
+    return isAdaptivePrimitivePage;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
index 954cab2..15217b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
@@ -21,11 +21,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * Below class is responsible to store variable length dimension data chunk in
@@ -236,28 +233,7 @@ public abstract class UnsafeVariableLengthDimensionDataChunkStore
     }
     // get the row from unsafe
     fillRowInternal(length, value, currentDataOffset);
-    DataType dt = vector.getType();
-    if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
-        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
-            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, value, 0, length)) {
-      vector.putNull(vectorRow);
-    } else {
-      if (dt == DataTypes.STRING) {
-        vector.putBytes(vectorRow, 0, length, value);
-      } else if (dt == DataTypes.BOOLEAN) {
-        vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
-      } else if (dt == DataTypes.SHORT) {
-        vector.putShort(vectorRow, ByteUtil.toXorShort(value, 0, length));
-      } else if (dt == DataTypes.INT) {
-        vector.putInt(vectorRow, ByteUtil.toXorInt(value, 0, length));
-      } else if (dt == DataTypes.LONG) {
-        vector.putLong(vectorRow,
-            DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0,
-                length));
-      } else if (dt == DataTypes.TIMESTAMP) {
-        vector.putLong(vectorRow, ByteUtil.toXorLong(value, 0, length) * 1000L);
-      }
-    }
+    QueryUtil.putDataToVector(vector, value, vectorRow, length);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
new file mode 100644
index 0000000..6f3f139
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public abstract class BlockIndexerStorage<T> {
+
+  public abstract short[] getRowIdPage();
+
+  public abstract int getRowIdPageLengthInBytes();
+
+  public abstract short[] getRowIdRlePage();
+
+  public abstract int getRowIdRlePageLengthInBytes();
+
+  public abstract T getDataPage();
+
+  public abstract short[] getDataRlePage();
+
+  public abstract int getDataRlePageLengthInBytes();
+
+  /**
+   * It compresses depends up on the sequence numbers.
+   * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
+   * first array the start and end of sequential numbers and second array
+   * keeps the indexes of where sequential numbers starts. If there is no
+   * sequential numbers then the same array it returns with empty second
+   * array.
+   *
+   * @param rowIds
+   */
+  protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds, short[] rowIdPage,
+      short[] rowIdRlePage) {
+    List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    int k = 0;
+    int i = 1;
+    for (; i < rowIds.length; i++) {
+      if (rowIds[i] - rowIds[i - 1] == 1) {
+        k++;
+      } else {
+        if (k > 0) {
+          map.add(((short) list.size()));
+          list.add(rowIds[i - k - 1]);
+          list.add(rowIds[i - 1]);
+        } else {
+          list.add(rowIds[i - 1]);
+        }
+        k = 0;
+      }
+    }
+    if (k > 0) {
+      map.add(((short) list.size()));
+      list.add(rowIds[i - k - 1]);
+      list.add(rowIds[i - 1]);
+    } else {
+      list.add(rowIds[i - 1]);
+    }
+    int compressionPercentage = (((list.size() + map.size()) * 100) / rowIds.length);
+    if (compressionPercentage > 70) {
+      rowIdPage = rowIds;
+    } else {
+      rowIdPage = convertToArray(list);
+    }
+    if (rowIds.length == rowIdPage.length) {
+      rowIdRlePage = new short[0];
+    } else {
+      rowIdRlePage = convertToArray(map);
+    }
+    Map<String, short[]> rowIdAndRowRleIdPages = new HashMap<>(2);
+    rowIdAndRowRleIdPages.put("rowIdPage", rowIdPage);
+    rowIdAndRowRleIdPages.put("rowRlePage", rowIdRlePage);
+    return rowIdAndRowRleIdPages;
+  }
+
+  protected short[] convertToArray(List<Short> list) {
+    short[] shortArray = new short[list.size()];
+    for (int i = 0; i < shortArray.length; i++) {
+      shortArray[i] = list.get(i);
+    }
+    return shortArray;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
new file mode 100644
index 0000000..b3e25d3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+public class BlockIndexerStorageForNoDictionary extends BlockIndexerStorage<Object[]> {
+
+  private short[] rowIdPage;
+
+  private short[] rowIdRlePage;
+
+  private Object[] dataPage;
+
+  private DataType dataType;
+
+  public BlockIndexerStorageForNoDictionary(Object[] dataPage, DataType dataType,
+      boolean isSortRequired) {
+    this.dataType = dataType;
+    ColumnWithRowIdForNoDictionary<Short>[] dataWithRowId = createColumnWithRowId(dataPage);
+    if (isSortRequired) {
+      Arrays.sort(dataWithRowId);
+    }
+    short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
+    Map<String, short[]> rowIdAndRleRowIdPages =
+        rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+    rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
+    rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
+  }
+
+  /**
+   * Create an object with each column array and respective rowId
+   *
+   * @return
+   */
+  private ColumnWithRowIdForNoDictionary<Short>[] createColumnWithRowId(Object[] dataPage) {
+    ColumnWithRowIdForNoDictionary<Short>[] columnWithIndexs =
+        new ColumnWithRowIdForNoDictionary[dataPage.length];
+    for (short i = 0; i < columnWithIndexs.length; i++) {
+      columnWithIndexs[i] = new ColumnWithRowIdForNoDictionary<>(dataPage[i], i, dataType);
+    }
+    return columnWithIndexs;
+  }
+
+  private short[] extractDataAndReturnRowId(ColumnWithRowIdForNoDictionary<Short>[] dataWithRowId,
+      Object[] dataPage) {
+    short[] indexes = new short[dataWithRowId.length];
+    for (int i = 0; i < indexes.length; i++) {
+      indexes[i] = dataWithRowId[i].getIndex();
+      dataPage[i] = dataWithRowId[i].getColumn();
+    }
+    this.dataPage = dataPage;
+    return indexes;
+  }
+
+  /**
+   * @return the rowIdPage
+   */
+  @Override
+  public short[] getRowIdPage() {
+    return rowIdPage;
+  }
+
+  @Override
+  public int getRowIdPageLengthInBytes() {
+    if (rowIdPage != null) {
+      return rowIdPage.length * 2;
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public short[] getRowIdRlePage() {
+    return rowIdRlePage;
+  }
+
+  @Override
+  public int getRowIdRlePageLengthInBytes() {
+    if (rowIdRlePage != null) {
+      return rowIdRlePage.length * 2;
+    } else {
+      return 0;
+    }
+  }
+
+  @Override public Object[] getDataPage() {
+    return dataPage;
+  }
+
+  @Override public short[] getDataRlePage() {
+    return new short[0];
+  }
+
+  @Override public int getDataRlePageLengthInBytes() {
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
index bbb3434..66fefe0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.util.ByteUtil;
 /**
  * Below class will be used to for no inverted index
  */
-public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStorage<short[]> {
+public class BlockIndexerStorageForNoInvertedIndexForShort extends BlockIndexerStorage<byte[][]> {
 
   /**
    * column data
@@ -78,14 +78,6 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     }
   }
 
-  private short[] convertToArray(List<Short> list) {
-    short[] shortArray = new short[list.size()];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i);
-    }
-    return shortArray;
-  }
-
   private byte[][] convertToDataPage(List<byte[]> list) {
     byte[][] shortArray = new byte[list.size()][];
     for (int i = 0; i < shortArray.length; i++) {
@@ -98,7 +90,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     return dataRlePage;
   }
 
-  @Override public int getDataRlePageLengthInBytes() {
+  public int getDataRlePageLengthInBytes() {
     if (dataRlePage != null) {
       return dataRlePage.length * 2;
     } else {
@@ -115,7 +107,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     return new short[0];
   }
 
-  @Override public int getRowIdPageLengthInBytes() {
+  public int getRowIdPageLengthInBytes() {
     return 0;
   }
 
@@ -128,7 +120,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     return new short[0];
   }
 
-  @Override public int getRowIdRlePageLengthInBytes() {
+  public int getRowIdRlePageLengthInBytes() {
     return 0;
   }
 
@@ -138,4 +130,5 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
   public byte[][] getDataPage() {
     return dataPage;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
index be6a1a7..f1b9af2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -19,11 +19,12 @@ package org.apache.carbondata.core.datastore.columnar;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.ByteUtil;
 
-public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
+public class BlockIndexerStorageForShort extends BlockIndexerStorage<byte[][]> {
 
   private boolean alreadySorted;
 
@@ -42,7 +43,10 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
       Arrays.sort(dataWithRowId);
     }
     short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
-    rleEncodeOnRowId(rowIds);
+    Map<String, short[]> rowIdAndRleRowIdPages =
+        rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+    rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
+    rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
     if (rleOnData) {
       rleEncodeOnData(dataWithRowId);
     }
@@ -80,66 +84,6 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
   }
 
   /**
-   * It compresses depends up on the sequence numbers.
-   * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
-   * first array the start and end of sequential numbers and second array
-   * keeps the indexes of where sequential numbers starts. If there is no
-   * sequential numbers then the same array it returns with empty second
-   * array.
-   *
-   * @param rowIds
-   */
-  private void rleEncodeOnRowId(short[] rowIds) {
-    List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    int k = 0;
-    int i = 1;
-    for (; i < rowIds.length; i++) {
-      if (rowIds[i] - rowIds[i - 1] == 1) {
-        k++;
-      } else {
-        if (k > 0) {
-          map.add(((short) list.size()));
-          list.add(rowIds[i - k - 1]);
-          list.add(rowIds[i - 1]);
-        } else {
-          list.add(rowIds[i - 1]);
-        }
-        k = 0;
-      }
-    }
-    if (k > 0) {
-      map.add(((short) list.size()));
-      list.add(rowIds[i - k - 1]);
-      list.add(rowIds[i - 1]);
-    } else {
-      list.add(rowIds[i - 1]);
-    }
-    int compressionPercentage = (((list.size() + map.size()) * 100) / rowIds.length);
-    if (compressionPercentage > 70) {
-      rowIdPage = rowIds;
-    } else {
-      rowIdPage = convertToArray(list);
-    }
-    if (rowIds.length == rowIdPage.length) {
-      rowIdRlePage = new short[0];
-    } else {
-      rowIdRlePage = convertToArray(map);
-    }
-    if (rowIdPage.length == 2 && rowIdRlePage.length == 1) {
-      alreadySorted = true;
-    }
-  }
-
-  private short[] convertToArray(List<Short> list) {
-    short[] shortArray = new short[list.size()];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i);
-    }
-    return shortArray;
-  }
-
-  /**
    * @return the alreadySorted
    */
   public boolean isAlreadySorted() {
@@ -153,7 +97,6 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
     return rowIdPage;
   }
 
-  @Override
   public int getRowIdPageLengthInBytes() {
     if (rowIdPage != null) {
       return rowIdPage.length * 2;
@@ -169,7 +112,6 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
     return rowIdRlePage;
   }
 
-  @Override
   public int getRowIdRlePageLengthInBytes() {
     if (rowIdRlePage != null) {
       return rowIdRlePage.length * 2;
@@ -234,6 +176,7 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
     return shortArray;
   }
 
+  @Override
   public short[] getDataRlePage() {
     return dataRlePage;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java
new file mode 100644
index 0000000..affef97
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.columnar;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+
+public class ColumnWithRowIdForNoDictionary<T>
+    implements Comparable<ColumnWithRowIdForNoDictionary<T>> {
+
+  Object column;
+
+  T index;
+
+  DataType dataType;
+
+  ColumnWithRowIdForNoDictionary(Object column, T index, DataType dataType) {
+    this.column = column;
+    this.index = index;
+    this.dataType = dataType;
+  }
+
+  @Override public int compareTo(ColumnWithRowIdForNoDictionary o) {
+    // use the data type based comparator for the no dictionary encoded columns
+    SerializableComparator comparator =
+        org.apache.carbondata.core.util.comparator.Comparator.getComparator(dataType);
+    return comparator.compare(column, o.column);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    ColumnWithRowIdForNoDictionary o = (ColumnWithRowIdForNoDictionary)obj;
+    return column.equals(o.column) && getIndex() == o.getIndex();
+  }
+
+  @Override public int hashCode() {
+    return getColumn().hashCode() + getIndex().hashCode();
+  }
+
+  /**
+   * @return the index
+   */
+  public T getIndex() {
+    return index;
+  }
+
+
+  /**
+   * @return the column
+   */
+  public Object getColumn() {
+    return column;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
deleted file mode 100644
index a30ea88..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-public interface IndexStorage<T> {
-
-  T getRowIdPage();
-
-  int getRowIdPageLengthInBytes();
-
-  T getRowIdRlePage();
-
-  int getRowIdRlePageLengthInBytes();
-
-  byte[][] getDataPage();
-
-  T getDataRlePage();
-
-  int getDataRlePageLengthInBytes();
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index 5b560ab..3067823 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -34,9 +34,11 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -62,6 +64,21 @@ public abstract class ColumnPageEncoder {
   protected abstract ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage);
 
   /**
+   * Get the target data type of the page if encoded
+   *
+   * @param inputPage
+   * @return
+   */
+  public DataType getTargetDataType(ColumnPage inputPage) {
+    ColumnPageEncoderMeta encoderMeta = getEncoderMeta(inputPage);
+    if (null != encoderMeta) {
+      return encoderMeta.getStoreDataType();
+    } else {
+      return null;
+    }
+  }
+
+  /**
    * Return a encoded column page by encoding the input page
    * The encoded binary data and metadata are wrapped in encoding column page
    */
@@ -118,17 +135,28 @@ public abstract class ColumnPageEncoder {
   }
 
   private void fillMinMaxIndex(ColumnPage inputPage, DataChunk2 dataChunk) {
-    dataChunk.setMin_max(buildMinMaxIndex(inputPage));
+    dataChunk.setMin_max(buildMinMaxIndex(inputPage, dataChunk.encoders));
   }
 
-  private BlockletMinMaxIndex buildMinMaxIndex(ColumnPage inputPage) {
+  private BlockletMinMaxIndex buildMinMaxIndex(ColumnPage inputPage, List<Encoding> encoders) {
     BlockletMinMaxIndex index = new BlockletMinMaxIndex();
-    byte[] bytes = CarbonUtil.getValueAsBytes(
-        inputPage.getDataType(), inputPage.getStatistics().getMax());
-    ByteBuffer max = ByteBuffer.wrap(
-        bytes);
-    ByteBuffer min = ByteBuffer.wrap(
-        CarbonUtil.getValueAsBytes(inputPage.getDataType(), inputPage.getStatistics().getMin()));
+    ByteBuffer max;
+    ByteBuffer min;
+    if (CarbonUtil.isEncodedWithMeta(encoders)
+        && inputPage.getColumnSpec().getColumnType() == ColumnType.PLAIN_VALUE) {
+      max = ByteBuffer.wrap(DataTypeUtil
+          .getMinMaxBytesBasedOnDataTypeForNoDictionaryColumn(inputPage.getStatistics().getMax(),
+              inputPage.getDataType()));
+      min = ByteBuffer.wrap(DataTypeUtil
+          .getMinMaxBytesBasedOnDataTypeForNoDictionaryColumn(inputPage.getStatistics().getMin(),
+              inputPage.getDataType()));
+    } else {
+      byte[] bytes =
+          CarbonUtil.getValueAsBytes(inputPage.getDataType(), inputPage.getStatistics().getMax());
+      max = ByteBuffer.wrap(bytes);
+      min = ByteBuffer.wrap(
+          CarbonUtil.getValueAsBytes(inputPage.getDataType(), inputPage.getStatistics().getMin()));
+    }
     index.addToMax_values(max);
     index.addToMin_values(min);
     index.addToMin_max_presence(inputPage.getStatistics().writeMinMax());
@@ -187,11 +215,11 @@ public abstract class ColumnPageEncoder {
       } else if ((inputPage.getDataType() == DataTypes.BYTE) || (inputPage.getDataType()
           == DataTypes.SHORT) || (inputPage.getDataType() == DataTypes.INT) || (
           inputPage.getDataType() == DataTypes.LONG)) {
-        return selectCodecByAlgorithmForIntegral(inputPage.getStatistics(), true)
+        return selectCodecByAlgorithmForIntegral(inputPage.getStatistics(), true, columnSpec)
             .createEncoder(null);
       } else if ((inputPage.getDataType() == DataTypes.FLOAT) || (inputPage.getDataType()
           == DataTypes.DOUBLE)) {
-        return selectCodecByAlgorithmForFloating(inputPage.getStatistics(), true)
+        return selectCodecByAlgorithmForFloating(inputPage.getStatistics(), true, columnSpec)
             .createEncoder(null);
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 29772d1..993b6b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding;
 
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
@@ -36,6 +37,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * Default factory will select encoding base on column page data type and statistics
@@ -57,8 +59,11 @@ public class DefaultEncodingFactory extends EncodingFactory {
   @Override
   public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPage inputPage) {
     // TODO: add log
-    if (columnSpec instanceof TableSpec.MeasureSpec) {
-      return createEncoderForMeasure(inputPage);
+    // choose the encoding type for measure type and no dictionary primitive type columns
+    if (columnSpec instanceof TableSpec.MeasureSpec || (
+        DataTypeUtil.isPrimitiveColumn(columnSpec.getSchemaDataType())
+            && columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
+      return createEncoderForMeasureOrNoDictionaryPrimitive(inputPage, columnSpec);
     } else {
       if (newWay) {
         return createEncoderForDimension((TableSpec.DimensionSpec) columnSpec, inputPage);
@@ -107,7 +112,8 @@ public class DefaultEncodingFactory extends EncodingFactory {
     }
   }
 
-  private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) {
+  private ColumnPageEncoder createEncoderForMeasureOrNoDictionaryPrimitive(ColumnPage columnPage,
+      TableSpec.ColumnSpec columnSpec) {
     SimpleStatsResult stats = columnPage.getStatistics();
     DataType dataType = stats.getDataType();
     if (dataType == DataTypes.BOOLEAN) {
@@ -116,11 +122,11 @@ public class DefaultEncodingFactory extends EncodingFactory {
         dataType == DataTypes.SHORT ||
         dataType == DataTypes.INT ||
         dataType == DataTypes.LONG) {
-      return selectCodecByAlgorithmForIntegral(stats, false).createEncoder(null);
+      return selectCodecByAlgorithmForIntegral(stats, false, columnSpec).createEncoder(null);
     } else if (DataTypes.isDecimal(dataType)) {
-      return createEncoderForDecimalDataTypeMeasure(columnPage);
+      return createEncoderForDecimalDataTypeMeasure(columnPage, columnSpec);
     } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
-      return selectCodecByAlgorithmForFloating(stats, false).createEncoder(null);
+      return selectCodecByAlgorithmForFloating(stats, false, columnSpec).createEncoder(null);
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
     } else {
@@ -128,13 +134,15 @@ public class DefaultEncodingFactory extends EncodingFactory {
     }
   }
 
-  private ColumnPageEncoder createEncoderForDecimalDataTypeMeasure(ColumnPage columnPage) {
+  private ColumnPageEncoder createEncoderForDecimalDataTypeMeasure(ColumnPage columnPage,
+      TableSpec.ColumnSpec columnSpec) {
     DecimalConverterFactory.DecimalConverterType decimalConverterType =
         ((DecimalColumnPage) columnPage).getDecimalConverter().getDecimalConverterType();
     switch (decimalConverterType) {
       case DECIMAL_INT:
       case DECIMAL_LONG:
-        return selectCodecByAlgorithmForDecimal(columnPage.getStatistics(), decimalConverterType)
+        return selectCodecByAlgorithmForDecimal(columnPage.getStatistics(), decimalConverterType,
+            columnSpec)
             .createEncoder(null);
       default:
         return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
@@ -245,7 +253,7 @@ public class DefaultEncodingFactory extends EncodingFactory {
    * size is smaller
    */
   static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats,
-      boolean isComplexPrimitive) {
+      boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec) {
     DataType srcDataType = stats.getDataType();
     DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
 
@@ -260,19 +268,40 @@ public class DefaultEncodingFactory extends EncodingFactory {
         return new DirectCompressCodec(stats.getDataType());
       }
     }
+    boolean isInvertedIndex = isInvertedIndex(isComplexPrimitive, columnSpec);
     if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
       // choose adaptive encoding
-      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats);
+      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats,
+          isInvertedIndex);
     } else {
       // choose delta adaptive encoding
-      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats);
+      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats,
+          isInvertedIndex);
     }
   }
 
+  /**
+   * Check whether the column is sort column and inverted index column
+   *
+   * @param isComplexPrimitive
+   * @param columnSpec
+   * @return
+   */
+  private static boolean isInvertedIndex(boolean isComplexPrimitive,
+      TableSpec.ColumnSpec columnSpec) {
+    boolean isSort;
+    boolean isInvertedIndex = false;
+    if (columnSpec instanceof TableSpec.DimensionSpec && !isComplexPrimitive) {
+      isSort = ((TableSpec.DimensionSpec) columnSpec).isInSortColumns();
+      isInvertedIndex = isSort && ((TableSpec.DimensionSpec) columnSpec).isDoInvertedIndex();
+    }
+    return isInvertedIndex;
+  }
+
   // choose between upscale adaptive encoder or upscale delta adaptive encoder,
   // based on whose target data type size is smaller
   static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats,
-      boolean isComplexPrimitive) {
+      boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec) {
     DataType srcDataType = stats.getDataType();
     double maxValue = (double) stats.getMax();
     double minValue = (double) stats.getMin();
@@ -290,7 +319,7 @@ public class DefaultEncodingFactory extends EncodingFactory {
     double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
     if (decimalCount == 0) {
       // short, int, long
-      return selectCodecByAlgorithmForIntegral(stats, false);
+      return selectCodecByAlgorithmForIntegral(stats, false, columnSpec);
     } else if (decimalCount < 0 && !isComplexPrimitive) {
       return new DirectCompressCodec(DataTypes.DOUBLE);
     } else {
@@ -304,11 +333,13 @@ public class DefaultEncodingFactory extends EncodingFactory {
         DataType deltaDataType = compareMinMaxAndSelectDataType(
             (long) (Math.pow(10, decimalCount) * (maxValue - minValue)));
         if (adaptiveDataType.getSizeInBytes() > deltaDataType.getSizeInBytes()) {
-          return new AdaptiveDeltaFloatingCodec(srcDataType, deltaDataType, stats);
+          return new AdaptiveDeltaFloatingCodec(srcDataType, deltaDataType, stats,
+              isInvertedIndex(isComplexPrimitive, columnSpec));
         } else if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes() || (
             (isComplexPrimitive) && (adaptiveDataType.getSizeInBytes() == DataTypes.DOUBLE
                 .getSizeInBytes()))) {
-          return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats);
+          return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats,
+              isInvertedIndex(isComplexPrimitive, columnSpec));
         } else {
           return new DirectCompressCodec(DataTypes.DOUBLE);
         }
@@ -321,7 +352,8 @@ public class DefaultEncodingFactory extends EncodingFactory {
    * size is smaller for decimal data type
    */
   static ColumnPageCodec selectCodecByAlgorithmForDecimal(SimpleStatsResult stats,
-      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
+      DecimalConverterFactory.DecimalConverterType decimalConverterType,
+      TableSpec.ColumnSpec columnSpec) {
     DataType srcDataType = stats.getDataType();
     DataType adaptiveDataType =
         fitMinMaxForDecimalType(stats.getDataType(), stats.getMax(), stats.getMin(),
@@ -343,10 +375,12 @@ public class DefaultEncodingFactory extends EncodingFactory {
     }
     if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
       // choose adaptive encoding
-      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats);
+      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats,
+          isInvertedIndex(columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE, columnSpec));
     } else {
       // choose delta adaptive encoding
-      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats);
+      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats,
+          isInvertedIndex(columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE, columnSpec));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index d119c8f..920a516 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -66,7 +66,7 @@ public abstract class EncodingFactory {
    */
   public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
       String compressor) throws IOException {
-    assert (encodings.size() == 1);
+    assert (encodings.size() >= 1);
     assert (encoderMetas.size() == 1);
     Encoding encoding = encodings.get(0);
     byte[] encoderMeta = encoderMetas.get(0).array();
@@ -81,25 +81,27 @@ public abstract class EncodingFactory {
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveIntegralCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
-          stats).createDecoder(metadata);
+          stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveDeltaIntegralCodec(metadata.getSchemaDataType(),
-          metadata.getStoreDataType(), stats).createDecoder(metadata);
+          metadata.getStoreDataType(), stats, encodings.contains(Encoding.INVERTED_INDEX))
+          .createDecoder(metadata);
     } else if (encoding == ADAPTIVE_FLOATING) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
-          stats).createDecoder(metadata);
+          stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_DELTA_FLOATING) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveDeltaFloatingCodec(metadata.getSchemaDataType(),
-          metadata.getStoreDataType(), stats).createDecoder(metadata);
+          metadata.getStoreDataType(), stats, encodings.contains(Encoding.INVERTED_INDEX))
+          .createDecoder(metadata);
     } else if (encoding == RLE_INTEGRAL) {
       RLEEncoderMeta metadata = new RLEEncoderMeta();
       metadata.readFields(in);
@@ -132,7 +134,7 @@ public abstract class EncodingFactory {
         dataType == DataTypes.LONG) {
       // create the codec based on algorithm and create decoder by recovering the metadata
       ColumnPageCodec codec =
-          DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats, false);
+          DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats, false, spec);
       if (codec instanceof AdaptiveIntegralCodec) {
         AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
         ColumnPageEncoderMeta meta =
@@ -154,7 +156,7 @@ public abstract class EncodingFactory {
     } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
       // create the codec based on algorithm and create decoder by recovering the metadata
       ColumnPageCodec codec =
-          DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats, false);
+          DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats, false, spec);
       if (codec instanceof AdaptiveFloatingCodec) {
         AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
         ColumnPageEncoderMeta meta =
@@ -180,7 +182,7 @@ public abstract class EncodingFactory {
     } else if (dataType == DataTypes.LEGACY_LONG) {
       // In case of older versions like in V1 format it has special datatype to handle
       AdaptiveIntegralCodec adaptiveCodec =
-          new AdaptiveIntegralCodec(DataTypes.LONG, DataTypes.LONG, stats);
+          new AdaptiveIntegralCodec(DataTypes.LONG, DataTypes.LONG, stats, false);
       ColumnPageEncoderMeta meta =
           new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
       return adaptiveCodec.createDecoder(meta);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
index ece5cb6..ef7a6a9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
@@ -17,9 +17,24 @@
 
 package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoDictionary;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.SortState;
 
 /**
  * Subclass of this codec depends on statistics of the column page (adaptive) to perform apply
@@ -38,17 +53,194 @@ public abstract class AdaptiveCodec implements ColumnPageCodec {
   // the data type specified in schema
   protected final DataType srcDataType;
 
+  protected boolean isInvertedIndex;
+
+  protected BlockIndexerStorage<Object[]> indexStorage;
+
+  protected ColumnPage encodedPage;
+
   protected AdaptiveCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
+      SimpleStatsResult stats, boolean isInvertedIndex) {
     this.stats = stats;
     this.srcDataType = srcDataType;
     this.targetDataType = targetDataType;
+    this.isInvertedIndex = isInvertedIndex;
   }
 
   public DataType getTargetDataType() {
     return targetDataType;
   }
 
+  /**
+   * Convert the data of the page based on the data type for each row
+   * While preparing the inverted index for the page,
+   * we need the data based on data type for no dict measure column if adaptive encoding is applied
+   * This is similar to page.getByteArrayPage()
+   *
+   * @param input
+   * @return
+   */
+  public Object[] getPageBasedOnDataType(ColumnPage input) {
+    Object[] data = new Object[input.getActualRowCount()];
+    if (srcDataType == DataTypes.BYTE || srcDataType == DataTypes.BOOLEAN) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getByte(i);
+      }
+    } else if (srcDataType == DataTypes.SHORT) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getShort(i);
+      }
+    } else if (srcDataType == DataTypes.SHORT_INT) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getShortInt(i);
+      }
+    } else if (srcDataType == DataTypes.INT) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getInt(i);
+      }
+    } else if (srcDataType == DataTypes.LONG) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getLong(i);
+      }
+    } else if (srcDataType == DataTypes.FLOAT) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getFloat(i);
+      }
+    } else if (srcDataType == DataTypes.DOUBLE) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getDouble(i);
+      }
+    }
+    return data;
+  }
+
+  /**
+   * Put the data to the page based on the data type for each row
+   *
+   * @param page
+   * @return
+   */
+  public void putDataToPage(ColumnPage page, Object[] dataPage) {
+    if (srcDataType == DataTypes.BYTE || srcDataType == DataTypes.BOOLEAN) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putByte(i, (byte) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.SHORT) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putShort(i, (short) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.SHORT_INT) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putShortInt(i, (int) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.INT) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putInt(i, (int) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.LONG) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putLong(i, (long) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.DOUBLE) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putDouble(i, (double) dataPage[i]);
+      }
+    }
+  }
+
+  /**
+   * Write the inverted index to the page if required
+   *
+   * @param result
+   * @throws IOException
+   */
+  public byte[] writeInvertedIndexIfRequired(byte[] result) throws IOException {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(stream);
+    if (null != indexStorage) {
+      out.write(result);
+      if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+        out.writeInt(indexStorage.getRowIdPageLengthInBytes());
+        short[] rowIdPage = (short[]) indexStorage.getRowIdPage();
+        for (short rowId : rowIdPage) {
+          out.writeShort(rowId);
+        }
+        if (indexStorage.getRowIdRlePageLengthInBytes() > 0) {
+          short[] rowIdRlePage = (short[]) indexStorage.getRowIdRlePage();
+          for (short rowIdRle : rowIdRlePage) {
+            out.writeShort(rowIdRle);
+          }
+        }
+      }
+    }
+    byte[] bytes = stream.toByteArray();
+    stream.close();
+    return bytes;
+  }
+
+  /**
+   * Fill legacy fields if required
+   *
+   * @param dataChunk
+   * @param result
+   */
+  public void fillLegacyFieldsIfRequired(DataChunk2 dataChunk, byte[] result) {
+    if (null != indexStorage) {
+      SortState sort = (indexStorage.getRowIdPageLengthInBytes() > 0) ?
+          SortState.SORT_EXPLICIT :
+          SortState.SORT_NATIVE;
+      dataChunk.setSort_state(sort);
+      if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+        int rowIdPageLength =
+            CarbonCommonConstants.INT_SIZE_IN_BYTE + indexStorage.getRowIdPageLengthInBytes()
+                + indexStorage.getRowIdRlePageLengthInBytes();
+        dataChunk.setRowid_page_length(rowIdPageLength);
+      }
+    } else {
+      dataChunk.setRowid_page_length(0);
+    }
+    if (null != result) {
+      dataChunk.setData_page_length(result.length);
+    }
+  }
+
+  /**
+   * Get the new column page based on the sorted data
+   *
+   * @param input
+   * @return
+   * @throws MemoryException
+   */
+  public ColumnPage getSortedColumnPageIfRequired(ColumnPage input) throws MemoryException {
+    if (null != indexStorage) {
+      Object[] dataPage = indexStorage.getDataPage();
+      ColumnPageEncoderMeta columnPageEncoderMeta =
+          new ColumnPageEncoderMeta(input.getColumnSpec(), input.getDataType(),
+              input.getColumnPageEncoderMeta().getCompressorName());
+      ColumnPage columnPage = ColumnPage.newPage(columnPageEncoderMeta, input.getPageSize());
+      putDataToPage(columnPage, dataPage);
+      return columnPage;
+    } else {
+      return input;
+    }
+  }
+
+  public byte[] encodeAndCompressPage(ColumnPage input, ColumnPageValueConverter converter,
+      Compressor compressor) throws MemoryException, IOException {
+    encodedPage = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(), targetDataType,
+            input.getColumnPageEncoderMeta().getCompressorName()), input.getPageSize());
+    if (isInvertedIndex) {
+      indexStorage =
+          new BlockIndexerStorageForNoDictionary(getPageBasedOnDataType(input), input.getDataType(),
+              isInvertedIndex);
+    }
+    ColumnPage columnPage = getSortedColumnPageIfRequired(input);
+    columnPage.convertValue(converter);
+    byte[] result = encodedPage.compress(compressor);
+    return result;
+  }
+
   @Override
   public String toString() {
     return String.format("%s[src type: %s, target type: %s, stats(%s)]",
@@ -58,4 +250,5 @@ public abstract class AdaptiveCodec implements ColumnPageCodec {
   protected String debugInfo() {
     return this.toString();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index bb928c2..6d0a8d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -44,18 +45,18 @@ import org.apache.carbondata.format.Encoding;
  */
 public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
 
-  private ColumnPage encodedPage;
   private Double factor;
   private long max;
 
   public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    return new AdaptiveDeltaFloatingCodec(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    return new AdaptiveDeltaFloatingCodec(srcDataType, targetDataType, stats,
+        isInvertedIndex);
   }
 
   public AdaptiveDeltaFloatingCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    super(srcDataType, targetDataType, stats, isInvertedIndex);
     this.factor = Math.pow(10, stats.getDecimalCount());
     this.max = (long) (Math.pow(10, stats.getDecimalCount()) * (double) stats.getMax());
   }
@@ -68,20 +69,20 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
+      byte[] result = null;
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(
-            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
-                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
-            input.getPageSize());
-        input.convertValue(converter);
         Compressor compressor = CompressorFactory.getInstance().getCompressor(
             input.getColumnCompressorName());
-        byte[] result = encodedPage.compress(compressor);
+        result = encodeAndCompressPage(input, converter, compressor);
+        byte[] bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
+        if (bytes.length != 0) {
+          return bytes;
+        }
         return result;
       }
 
@@ -89,6 +90,9 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
       protected List<Encoding> getEncodingList() {
         List<Encoding> encodings = new ArrayList<Encoding>();
         encodings.add(Encoding.ADAPTIVE_DELTA_FLOATING);
+        if (null != indexStorage && indexStorage.getRowIdPageLengthInBytes() > 0) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
         return encodings;
       }
 
@@ -98,6 +102,11 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
             inputPage.getColumnCompressorName());
       }
 
+      @Override
+      protected void fillLegacyFields(DataChunk2 dataChunk) throws IOException {
+        fillLegacyFieldsIfRequired(dataChunk, result);
+      }
+
     };
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index ac9693d..9ada0bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -45,12 +46,11 @@ import org.apache.carbondata.format.Encoding;
  */
 public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
 
-  private ColumnPage encodedPage;
   private long max;
 
   public AdaptiveDeltaIntegralCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    super(srcDataType, targetDataType, stats, isInvertedIndex);
     if (srcDataType == DataTypes.BYTE) {
       this.max = (byte) stats.getMax();
     } else if (srcDataType == DataTypes.SHORT) {
@@ -78,21 +78,19 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
-
+      byte[] result = null;
+      final Compressor compressor = CompressorFactory.getInstance().getCompressor();
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(
-            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
-                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
-            input.getPageSize());
-        input.convertValue(converter);
-        Compressor compressor = CompressorFactory.getInstance().getCompressor(
-            input.getColumnCompressorName());
-        byte[] result = encodedPage.compress(compressor);
+        result = encodeAndCompressPage(input, converter, compressor);
+        byte[] bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
+        if (bytes.length != 0) {
+          return bytes;
+        }
         return result;
       }
 
@@ -106,9 +104,17 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       protected List<Encoding> getEncodingList() {
         List<Encoding> encodings = new ArrayList<>();
         encodings.add(Encoding.ADAPTIVE_DELTA_INTEGRAL);
+        if (null != indexStorage && indexStorage.getRowIdPageLengthInBytes() > 0) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
         return encodings;
       }
 
+      @Override
+      protected void fillLegacyFields(DataChunk2 dataChunk) throws IOException {
+        fillLegacyFieldsIfRequired(dataChunk, result);
+      }
+
     };
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 028fa71..af1e9ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -43,12 +44,11 @@ import org.apache.carbondata.format.Encoding;
  */
 public class AdaptiveFloatingCodec extends AdaptiveCodec {
 
-  private ColumnPage encodedPage;
   private Double factor;
 
   public AdaptiveFloatingCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    super(srcDataType, targetDataType, stats, isInvertedIndex);
     this.factor = Math.pow(10, stats.getDecimalCount());
   }
 
@@ -60,20 +60,20 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
+      byte[] result = null;
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(
-            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
-                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
-            input.getPageSize());
-        Compressor compressor = CompressorFactory.getInstance().getCompressor(
-            input.getColumnCompressorName());
-        input.convertValue(converter);
-        byte[] result = encodedPage.compress(compressor);
+        Compressor compressor =
+            CompressorFactory.getInstance().getCompressor(input.getColumnCompressorName());
+        result = encodeAndCompressPage(input, converter, compressor);
+        byte[] bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
+        if (bytes.length != 0) {
+          return bytes;
+        }
         return result;
       }
 
@@ -81,6 +81,9 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
       protected List<Encoding> getEncodingList() {
         List<Encoding> encodings = new ArrayList<Encoding>();
         encodings.add(Encoding.ADAPTIVE_FLOATING);
+        if (null != indexStorage && indexStorage.getRowIdPageLengthInBytes() > 0) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
         return encodings;
       }
 
@@ -90,6 +93,11 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
             inputPage.getColumnCompressorName());
       }
 
+      @Override
+      protected void fillLegacyFields(DataChunk2 dataChunk) throws IOException {
+        fillLegacyFieldsIfRequired(dataChunk, result);
+      }
+
     };
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index a9cf742..f1c0ea0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -42,11 +43,9 @@ import org.apache.carbondata.format.Encoding;
  */
 public class AdaptiveIntegralCodec extends AdaptiveCodec {
 
-  private ColumnPage encodedPage;
-
   public AdaptiveIntegralCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    super(srcDataType, targetDataType, stats, isInvertedIndex);
   }
 
   @Override
@@ -57,20 +56,20 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
+      byte[] result = null;
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(
-            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
-                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
-            input.getPageSize());
-        Compressor compressor = CompressorFactory.getInstance().getCompressor(
-            input.getColumnCompressorName());
-        input.convertValue(converter);
-        byte[] result = encodedPage.compress(compressor);
+        Compressor compressor =
+            CompressorFactory.getInstance().getCompressor(input.getColumnCompressorName());
+        result = encodeAndCompressPage(input, converter, compressor);
+        byte[] bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
+        if (bytes.length != 0) {
+          return bytes;
+        }
         return result;
       }
 
@@ -78,6 +77,9 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       protected List<Encoding> getEncodingList() {
         List<Encoding> encodings = new ArrayList<Encoding>();
         encodings.add(Encoding.ADAPTIVE_INTEGRAL);
+        if (null != indexStorage && indexStorage.getRowIdPageLengthInBytes() > 0) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
         return encodings;
       }
 
@@ -87,6 +89,10 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
             inputPage.getColumnCompressorName());
       }
 
+      @Override
+      protected void fillLegacyFields(DataChunk2 dataChunk) throws IOException {
+        fillLegacyFieldsIfRequired(dataChunk, result);
+      }
     };
   }
 


[2/4] carbondata git commit: [CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
index 29e3060..29a4098 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data
@@ -61,8 +62,12 @@ public class BloomDataMapBuilder extends AbstractBloomDataMapWriter implements D
   }
 
   @Override
-  protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) {
-    return value;
+  protected byte[] convertNonDictionaryValue(int indexColIdx, Object value) {
+    // no dictionary measure columns will be of original data, so convert it to bytes
+    if (DataTypeUtil.isPrimitiveColumn(indexColumns.get(indexColIdx).getDataType())) {
+      return CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value);
+    }
+    return (byte[]) value;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index cad9787..61bd036 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.Predicate;
@@ -73,11 +74,14 @@ public class BloomDataMapWriter extends AbstractBloomDataMapWriter {
     }
   }
 
-  protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) {
+  protected byte[] convertNonDictionaryValue(int indexColIdx, Object value) {
     if (DataTypes.VARCHAR == indexColumns.get(indexColIdx).getDataType()) {
-      return DataConvertUtil.getRawBytesForVarchar(value);
+      return DataConvertUtil.getRawBytesForVarchar((byte[]) value);
+    } else if (DataTypeUtil.isPrimitiveColumn(indexColumns.get(indexColIdx).getDataType())) {
+      // get bytes for the original value of the no dictionary column
+      return CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value);
     } else {
-      return DataConvertUtil.getRawBytes(value);
+      return DataConvertUtil.getRawBytes((byte[]) value);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 7cd241a..5525941 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -245,7 +245,7 @@ public class StoreCreator {
     date.setEncodingList(encodings);
     date.setColumnUniqueId(UUID.randomUUID().toString());
     date.setDimensionColumn(true);
-    date.setColumnReferenceId(id.getColumnUniqueId());
+    date.setColumnReferenceId(date.getColumnUniqueId());
     date.setSchemaOrdinal(schemaOrdinal++);
     if (sortColumns.contains(date.getColumnName())) {
       date.setSortColumn(true);
@@ -263,7 +263,7 @@ public class StoreCreator {
     if (sortColumns.contains(country.getColumnName())) {
       country.setSortColumn(true);
     }
-    country.setColumnReferenceId(id.getColumnUniqueId());
+    country.setColumnReferenceId(country.getColumnUniqueId());
     columnSchemas.add(country);
 
     ColumnSchema name = new ColumnSchema();
@@ -276,7 +276,7 @@ public class StoreCreator {
     if (sortColumns.contains(name.getColumnName())) {
       name.setSortColumn(true);
     }
-    name.setColumnReferenceId(id.getColumnUniqueId());
+    name.setColumnReferenceId(name.getColumnUniqueId());
     columnSchemas.add(name);
 
     ColumnSchema phonetype = new ColumnSchema();
@@ -289,7 +289,7 @@ public class StoreCreator {
     if (sortColumns.contains(phonetype.getColumnName())) {
       phonetype.setSortColumn(true);
     }
-    phonetype.setColumnReferenceId(id.getColumnUniqueId());
+    phonetype.setColumnReferenceId(phonetype.getColumnUniqueId());
     columnSchemas.add(phonetype);
 
     ColumnSchema serialname = new ColumnSchema();
@@ -302,7 +302,7 @@ public class StoreCreator {
     if (sortColumns.contains(serialname.getColumnName())) {
       serialname.setSortColumn(true);
     }
-    serialname.setColumnReferenceId(id.getColumnUniqueId());
+    serialname.setColumnReferenceId(serialname.getColumnUniqueId());
     columnSchemas.add(serialname);
     ColumnSchema salary = new ColumnSchema();
     salary.setColumnName("salary");
@@ -310,11 +310,13 @@ public class StoreCreator {
     salary.setEncodingList(new ArrayList<Encoding>());
     salary.setColumnUniqueId(UUID.randomUUID().toString());
     salary.setDimensionColumn(false);
-    salary.setColumnReferenceId(id.getColumnUniqueId());
+    salary.setColumnReferenceId(salary.getColumnUniqueId());
     salary.setSchemaOrdinal(schemaOrdinal++);
     columnSchemas.add(salary);
 
-    tableSchema.setListOfColumns(columnSchemas);
+    // rearrange the column schema based on the sort order, if sort columns exists
+    List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas);
+    tableSchema.setListOfColumns(columnSchemas1);
     SchemaEvolution schemaEvol = new SchemaEvolution();
     schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
     tableSchema.setSchemaEvolution(schemaEvol);
@@ -352,6 +354,29 @@ public class StoreCreator {
     return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
   }
 
+  private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) {
+    List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size());
+    // add sort columns first
+    for (ColumnSchema columnSchema : columnSchemas) {
+      if (columnSchema.isSortColumn()) {
+        newColumnSchema.add(columnSchema);
+      }
+    }
+    // add other dimension columns
+    for (ColumnSchema columnSchema : columnSchemas) {
+      if (!columnSchema.isSortColumn() && columnSchema.isDimensionColumn()) {
+        newColumnSchema.add(columnSchema);
+      }
+    }
+    // add measure columns
+    for (ColumnSchema columnSchema : columnSchemas) {
+      if (!columnSchema.isDimensionColumn()) {
+        newColumnSchema.add(columnSchema);
+      }
+    }
+    return newColumnSchema;
+  }
+
   private void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
     BufferedReader reader = new BufferedReader(new InputStreamReader(
         new FileInputStream(factFilePath), "UTF-8"));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index be40b13..e810829 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -86,7 +86,9 @@ object DataLoadProcessBuilderOnSpark {
     val sortParameters = SortParameters.createSortParameters(configuration)
     val rowComparator: Comparator[Array[AnyRef]] =
       if (sortParameters.getNoDictionaryCount > 0) {
-        new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn)
+        new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn,
+          sortParameters.getNoDictionarySortColumn,
+          sortParameters.getNoDictDataType)
       } else {
         new NewRowComparatorForNormalDims(sortParameters.getDimColCount)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index c97732a..727191c 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -805,7 +805,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       model.setSegmentId("0")
       store.createCarbonStore(model)
       FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0/0"))
-      store.setSortColumns(new util.ArrayList[String](Seq("country,phonetype").asJava))
+      store.setSortColumns(new util.ArrayList[String](Seq("country","phonetype").asJava))
       model = store.createTableAndLoadModel(false)
       model.setSegmentId("1")
       store.createCarbonStore(model)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 0584fb1..1897c87 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -52,7 +52,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSch
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonUtil, TaskMetricsMap}
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil, TaskMetricsMap}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.datamap.bloom.DataConvertUtil
 import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
@@ -264,8 +264,17 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar
       rtn(i) = if (indexCol2IdxInDictArray.contains(col.getColName)) {
         surrogatKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer]
       } else if (indexCol2IdxInNoDictArray.contains(col.getColName)) {
-        data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex(
+        val bytes = data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex(
           indexCol2IdxInNoDictArray(col.getColName))
+        // no dictionary primitive columns are expected to be in original data while loading,
+        // so convert it to original data
+        if (DataTypeUtil.isPrimitiveColumn(col.getDataType)) {
+          val dataFromBytes = DataTypeUtil
+            .getDataBasedOnDataTypeForNoDictionaryColumn(bytes, col.getDataType)
+          dataFromBytes
+        } else {
+          bytes
+        }
       } else {
         // measures start from 1
         val value = data(1 + indexCol2IdxInMeasureArray(col.getColName))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index ad6823d..fcb6110 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
 
     assertResult(2)(result.length)
     assertResult("table_info1")(result(0).getString(0))
-    // 2087 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
-    assertResult(2216)(result(0).getLong(1))
+    // 2220 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
+    assertResult(2220)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2216)(result(1).getLong(1))
+    assertResult(2220)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 4d85296..616edeb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -339,6 +340,45 @@ public class CarbonDataLoadConfiguration {
     return type;
   }
 
+  /**
+   * Get the data types of the no dictionary and the complex dimensions of the table
+   *
+   * @return
+   */
+  public CarbonColumn[] getNoDictAndComplexDimensions() {
+    List<Integer> noDicOrCompIndexes = new ArrayList<>(dataFields.length);
+    int noDicCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (dataFields[i].getColumn().isDimension() && (
+          !(dataFields[i].getColumn().hasEncoding(Encoding.DICTIONARY)) || dataFields[i].getColumn()
+              .isComplex())) {
+        noDicOrCompIndexes.add(i);
+        noDicCount++;
+      }
+    }
+
+    CarbonColumn[] dims = new CarbonColumn[noDicCount];
+    for (int i = 0; i < dims.length; i++) {
+      dims[i] = dataFields[noDicOrCompIndexes.get(i)].getColumn();
+    }
+    return dims;
+  }
+
+  /**
+   * Get the sort column mapping of the table
+   *
+   * @return
+   */
+  public boolean[] getSortColumnMapping() {
+    boolean[] sortColumnMapping = new boolean[dataFields.length];
+    for (int i = 0; i < sortColumnMapping.length; i++) {
+      if (dataFields[i].getColumn().getColumnSchema().isSortColumn()) {
+        sortColumnMapping[i] = true;
+      }
+    }
+    return sortColumnMapping;
+  }
+
   public int[] calcDimensionLengths() {
     int[] dimLensWithComplex = getCardinalityFinder().getCardinality();
     if (!isSortTable()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 86f273d..7dfe95f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
@@ -111,6 +112,11 @@ public class FieldEncoderFactory {
             createComplexDataType(dataField, absoluteTableIdentifier,
                 client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index);
       } else {
+        // if the no dictionary column is a numeric column then treat is as measure col
+        // so that the adaptive encoding can be applied on it easily
+        if (DataTypeUtil.isPrimitiveColumn(dataField.getColumn().getDataType())) {
+          return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
+        }
         return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 9cbd607..20278e4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -20,8 +20,6 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
@@ -39,10 +37,6 @@ public class MeasureFieldConverterImpl implements FieldConverter {
 
   private int index;
 
-  private DataType dataType;
-
-  private CarbonMeasure measure;
-
   private String nullformat;
 
   private boolean isEmptyBadRecord;
@@ -51,8 +45,6 @@ public class MeasureFieldConverterImpl implements FieldConverter {
 
   public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
       boolean isEmptyBadRecord) {
-    this.dataType = dataField.getColumn().getDataType();
-    this.measure = (CarbonMeasure) dataField.getColumn();
     this.nullformat = nullformat;
     this.index = index;
     this.isEmptyBadRecord = isEmptyBadRecord;
@@ -73,20 +65,20 @@ public class MeasureFieldConverterImpl implements FieldConverter {
     Object output;
     boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(literalValue);
     if (literalValue == null || isNull) {
-      String message = logHolder.getColumnMessageMap().get(measure.getColName());
+      String message = logHolder.getColumnMessageMap().get(dataField.getColumn().getColName());
       if (null == message) {
-        message = CarbonDataProcessorUtil
-            .prepareFailureReason(measure.getColName(), measure.getDataType());
-        logHolder.getColumnMessageMap().put(measure.getColName(), message);
+        message = CarbonDataProcessorUtil.prepareFailureReason(dataField.getColumn().getColName(),
+            dataField.getColumn().getDataType());
+        logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), message);
       }
       return null;
     } else if (literalValue.length() == 0) {
       if (isEmptyBadRecord) {
-        String message = logHolder.getColumnMessageMap().get(measure.getColName());
+        String message = logHolder.getColumnMessageMap().get(dataField.getColumn().getColName());
         if (null == message) {
-          message = CarbonDataProcessorUtil
-              .prepareFailureReason(measure.getColName(), measure.getDataType());
-          logHolder.getColumnMessageMap().put(measure.getColName(), message);
+          message = CarbonDataProcessorUtil.prepareFailureReason(dataField.getColumn().getColName(),
+              dataField.getColumn().getDataType());
+          logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), message);
         }
         logHolder.setReason(message);
       }
@@ -96,18 +88,24 @@ public class MeasureFieldConverterImpl implements FieldConverter {
     } else {
       try {
         if (dataField.isUseActualData()) {
-          output =
-              DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure, true);
+          output = DataTypeUtil
+              .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(),
+                  dataField.getColumn().getColumnSchema().getScale(),
+                  dataField.getColumn().getColumnSchema().getPrecision(), true);
         } else {
-          output = DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure);
+          output = DataTypeUtil
+              .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(),
+                  dataField.getColumn().getColumnSchema().getScale(),
+                  dataField.getColumn().getColumnSchema().getPrecision());
         }
         return output;
       } catch (NumberFormatException e) {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug("Can not convert value to Numeric type value. Value considered as null.");
         }
-        logHolder.setReason(
-            CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
+        logHolder.setReason(CarbonDataProcessorUtil
+            .prepareFailureReason(dataField.getColumn().getColName(),
+                dataField.getColumn().getDataType()));
         return null;
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
index 64b64f5..3a325a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
@@ -21,7 +21,10 @@ import java.util.Comparator;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 /**
  * comparator for the converted row. The row has not been rearranged as 3-parted yet.
@@ -30,23 +33,38 @@ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 public class RawRowComparator implements Comparator<CarbonRow> {
   private int[] sortColumnIndices;
   private boolean[] isSortColumnNoDict;
+  private DataType[] noDicDataTypes;
 
-  public RawRowComparator(int[] sortColumnIndices, boolean[] isSortColumnNoDict) {
+  public RawRowComparator(int[] sortColumnIndices, boolean[] isSortColumnNoDict,
+      DataType[] noDicDataTypes) {
     this.sortColumnIndices = sortColumnIndices;
     this.isSortColumnNoDict = isSortColumnNoDict;
+    this.noDicDataTypes = noDicDataTypes;
   }
 
   @Override
   public int compare(CarbonRow o1, CarbonRow o2) {
     int diff = 0;
     int i = 0;
+    int noDicIdx = 0;
     for (int colIdx : sortColumnIndices) {
       if (isSortColumnNoDict[i]) {
-        byte[] colA = (byte[]) o1.getObject(colIdx);
-        byte[] colB = (byte[]) o2.getObject(colIdx);
-        diff = UnsafeComparer.INSTANCE.compareTo(colA, colB);
-        if (diff != 0) {
-          return diff;
+        if (DataTypeUtil.isPrimitiveColumn(noDicDataTypes[noDicIdx])) {
+          // for no dictionary numeric column get comparator based on the data type
+          SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator
+              .getComparator(noDicDataTypes[noDicIdx]);
+          int difference = comparator.compare(o1.getObject(colIdx), o2.getObject(colIdx));
+          if (difference != 0) {
+            return difference;
+          }
+          noDicIdx++;
+        } else {
+          byte[] colA = (byte[]) o1.getObject(colIdx);
+          byte[] colB = (byte[]) o2.getObject(colIdx);
+          diff = UnsafeComparer.INSTANCE.compareTo(colA, colB);
+          if (diff != 0) {
+            return diff;
+          }
         }
       } else {
         int colA = (int) o1.getObject(colIdx);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 1ad7879..844e45e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -23,7 +23,7 @@ package org.apache.carbondata.processing.loading.row;
  */
 public class IntermediateSortTempRow {
   private int[] dictSortDims;
-  private byte[][] noDictSortDims;
+  private Object[] noDictSortDims;
   /**
    * this will be used for intermediate merger when
    * no sort field and measure field will not be
@@ -35,14 +35,14 @@ public class IntermediateSortTempRow {
    */
   private Object[] measures;
 
-  public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
+  public IntermediateSortTempRow(int[] dictSortDims, Object[] noDictSortDims,
       byte[] noSortDimsAndMeasures) {
     this.dictSortDims = dictSortDims;
     this.noDictSortDims = noDictSortDims;
     this.noSortDimsAndMeasures = noSortDimsAndMeasures;
   }
 
-  public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
+  public IntermediateSortTempRow(int[] dictSortDims, Object[] noDictSortDims,
       Object[] measures) {
     this.dictSortDims = dictSortDims;
     this.noDictSortDims = noDictSortDims;
@@ -57,7 +57,7 @@ public class IntermediateSortTempRow {
     return measures;
   }
 
-  public byte[][] getNoDictSortDims() {
+  public Object[] getNoDictSortDims() {
     return noDictSortDims;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index 697f590..edfd317 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -23,10 +23,13 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonUnsafeUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
@@ -65,6 +68,14 @@ public class SortStepRowHandler implements Serializable {
 
   private DataType[] dataTypes;
 
+  private DataType[] noDictSortDataTypes;
+
+  private boolean[] noDictSortColMapping;
+
+  private DataType[] noDictNoSortDataTypes;
+
+  private boolean[] noDictNoSortColMapping;
+
   /**
    * constructor
    * @param tableFieldStat table field stat
@@ -85,6 +96,16 @@ public class SortStepRowHandler implements Serializable {
     this.complexDimIdx = tableFieldStat.getComplexDimIdx();
     this.measureIdx = tableFieldStat.getMeasureIdx();
     this.dataTypes = tableFieldStat.getMeasureDataType();
+    this.noDictSortDataTypes = tableFieldStat.getNoDictSortDataType();
+    noDictSortColMapping = new boolean[noDictSortDataTypes.length];
+    for (int i = 0; i < noDictSortDataTypes.length; i++) {
+      noDictSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictSortDataTypes[i]);
+    }
+    this.noDictNoSortDataTypes = tableFieldStat.getNoDictNoSortDataType();
+    noDictNoSortColMapping = new boolean[noDictNoSortDataTypes.length];
+    for (int i = 0; i < noDictNoSortDataTypes.length; i++) {
+      noDictNoSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictNoSortDataTypes[i]);
+    }
   }
 
   /**
@@ -108,8 +129,8 @@ public class SortStepRowHandler implements Serializable {
     try {
       int[] dictDims
           = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-      byte[][] nonDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt
-                                       + this.varcharDimCnt + this.complexDimCnt ][];
+      Object[] nonDictArray = new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt
+                                       + this.varcharDimCnt + this.complexDimCnt];
       Object[] measures = new Object[this.measureCnt];
 
       // convert dict & data
@@ -125,19 +146,19 @@ public class SortStepRowHandler implements Serializable {
       // convert no-dict & sort
       idxAcc = 0;
       for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.noDictSortDimIdx[idx]];
+        nonDictArray[idxAcc++] = row[this.noDictSortDimIdx[idx]];
       }
       // convert no-dict & no-sort
       for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
+        nonDictArray[idxAcc++] = row[this.noDictNoSortDimIdx[idx]];
       }
       // convert varchar dims
       for (int idx = 0; idx < this.varcharDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.varcharDimIdx[idx]];
+        nonDictArray[idxAcc++] = row[this.varcharDimIdx[idx]];
       }
       // convert complex dims
       for (int idx = 0; idx < this.complexDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.complexDimIdx[idx]];
+        nonDictArray[idxAcc++] = row[this.complexDimIdx[idx]];
       }
 
       // convert measure data
@@ -178,7 +199,7 @@ public class SortStepRowHandler implements Serializable {
   public IntermediateSortTempRow readWithoutNoSortFieldConvert(
       DataInputStream inputStream) throws IOException {
     int[] dictSortDims = new int[this.dictSortDimCnt];
-    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
+    Object[] noDictSortDims = new Object[this.noDictSortDimCnt];
 
     // read dict & sort dim data
     for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
@@ -187,10 +208,8 @@ public class SortStepRowHandler implements Serializable {
 
     // read no-dict & sort data
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      short len = inputStream.readShort();
-      byte[] bytes = new byte[len];
-      inputStream.readFully(bytes);
-      noDictSortDims[idx] = bytes;
+      // for no dict measure column get the original data
+      noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx);
     }
 
     // read no-dict dims & measures
@@ -213,9 +232,9 @@ public class SortStepRowHandler implements Serializable {
   public IntermediateSortTempRow readWithNoSortFieldConvert(
       DataInputStream inputStream) throws IOException {
     int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-    byte[][] noDictSortDims =
-        new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
-            + this.complexDimCnt][];
+    Object[] noDictSortDims =
+        new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+            + this.complexDimCnt];
 
     // read dict & sort dim data
     for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
@@ -224,10 +243,8 @@ public class SortStepRowHandler implements Serializable {
 
     // read no-dict & sort data
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      short len = inputStream.readShort();
-      byte[] bytes = new byte[len];
-      inputStream.readFully(bytes);
-      noDictSortDims[idx] = bytes;
+      // for no dict measure column get the original data
+      noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx);
     }
 
     // read no-dict dims & measures
@@ -240,8 +257,63 @@ public class SortStepRowHandler implements Serializable {
     return new IntermediateSortTempRow(dictSortDims, noDictSortDims,measure);
   }
 
+  /**
+   * Return the data from the stream according to the column type
+   *
+   * @param inputStream
+   * @param idx
+   * @throws IOException
+   */
+  private Object getDataForNoDictSortColumn(DataInputStream inputStream, int idx)
+      throws IOException {
+    if (this.noDictSortColMapping[idx]) {
+      return readDataFromStream(inputStream, idx);
+    } else {
+      short len = inputStream.readShort();
+      byte[] bytes = new byte[len];
+      inputStream.readFully(bytes);
+      return bytes;
+    }
+  }
+
+  /**
+   * Read the data from the stream
+   *
+   * @param inputStream
+   * @param idx
+   * @return
+   * @throws IOException
+   */
+  private Object readDataFromStream(DataInputStream inputStream, int idx) throws IOException {
+    DataType dataType = noDictSortDataTypes[idx];
+    Object data = null;
+    if (!inputStream.readBoolean()) {
+      return null;
+    }
+    if (dataType == DataTypes.BOOLEAN) {
+      data = inputStream.readBoolean();
+    } else if (dataType == DataTypes.BYTE) {
+      data = inputStream.readByte();
+    } else if (dataType == DataTypes.SHORT) {
+      data = inputStream.readShort();
+    } else if (dataType == DataTypes.INT) {
+      data = inputStream.readInt();
+    } else if (dataType == DataTypes.LONG) {
+      data = inputStream.readLong();
+    } else if (dataType == DataTypes.DOUBLE) {
+      data = inputStream.readDouble();
+    } else if (dataType == DataTypes.FLOAT) {
+      data = inputStream.readFloat();
+    } else if (dataType == DataTypes.BYTE_ARRAY || DataTypes.isDecimal(dataType)) {
+      byte[] bytes =
+          inputStream.readUTF().getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      data = bytes;
+    }
+    return data;
+  }
+
   private void unpackNoSortFromBytes(byte[] noSortDimsAndMeasures, int[] dictDims,
-      byte[][] noDictDims, Object[] measures) {
+      Object[] noDictDims, Object[] measures) {
     ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
     // read dict_no_sort
     for (int i = dictSortDimCnt; i < dictDims.length; i++) {
@@ -251,10 +323,15 @@ public class SortStepRowHandler implements Serializable {
     int noDictIndex = noDictSortDimCnt;
     // read no_dict_no_sort
     for (int i = 0; i < noDictNoSortDimCnt; i++) {
-      short len = rowBuffer.getShort();
-      byte[] bytes = new byte[len];
-      rowBuffer.get(bytes);
-      noDictDims[noDictIndex++] = bytes;
+      // for no dict measure column get the original data
+      if (this.noDictNoSortColMapping[i]) {
+        noDictDims[noDictIndex++] = getDataFromRowBuffer(noDictNoSortDataTypes[i], rowBuffer);
+      } else {
+        short len = rowBuffer.getShort();
+        byte[] bytes = new byte[len];
+        rowBuffer.get(bytes);
+        noDictDims[noDictIndex++] = bytes;
+      }
     }
 
     // read varchar dims
@@ -275,39 +352,49 @@ public class SortStepRowHandler implements Serializable {
 
     // read measure
     int measureCnt = measures.length;
-    DataType tmpDataType;
     Object tmpContent;
     for (short idx = 0 ; idx < measureCnt; idx++) {
-      if ((byte) 0 == rowBuffer.get()) {
-        measures[idx] = null;
-        continue;
-      }
+      tmpContent = getDataFromRowBuffer(dataTypes[idx], rowBuffer);
+      measures[idx] = tmpContent;
+    }
+  }
 
-      tmpDataType = dataTypes[idx];
-      if (DataTypes.BOOLEAN == tmpDataType) {
-        if ((byte) 1 == rowBuffer.get()) {
-          tmpContent = true;
-        } else {
-          tmpContent = false;
-        }
-      } else if (DataTypes.SHORT == tmpDataType) {
-        tmpContent = rowBuffer.getShort();
-      } else if (DataTypes.INT == tmpDataType) {
-        tmpContent = rowBuffer.getInt();
-      } else if (DataTypes.LONG == tmpDataType) {
-        tmpContent = rowBuffer.getLong();
-      } else if (DataTypes.DOUBLE == tmpDataType) {
-        tmpContent = rowBuffer.getDouble();
-      } else if (DataTypes.isDecimal(tmpDataType)) {
-        short len = rowBuffer.getShort();
-        byte[] decimalBytes = new byte[len];
-        rowBuffer.get(decimalBytes);
-        tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+  /**
+   * Retrieve/Get the data from the row buffer.
+   *
+   * @param tmpDataType
+   * @param rowBuffer
+   * @return
+   */
+  private Object getDataFromRowBuffer(DataType tmpDataType, ByteBuffer rowBuffer) {
+    Object tmpContent;
+    if ((byte) 0 == rowBuffer.get()) {
+      return null;
+    }
+
+    if (DataTypes.BOOLEAN == tmpDataType) {
+      if ((byte) 1 == rowBuffer.get()) {
+        tmpContent = true;
       } else {
-        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+        tmpContent = false;
       }
-      measures[idx] = tmpContent;
+    } else if (DataTypes.SHORT == tmpDataType) {
+      tmpContent = rowBuffer.getShort();
+    } else if (DataTypes.INT == tmpDataType) {
+      tmpContent = rowBuffer.getInt();
+    } else if (DataTypes.LONG == tmpDataType) {
+      tmpContent = rowBuffer.getLong();
+    } else if (DataTypes.DOUBLE == tmpDataType) {
+      tmpContent = rowBuffer.getDouble();
+    } else if (DataTypes.isDecimal(tmpDataType)) {
+      short len = rowBuffer.getShort();
+      byte[] decimalBytes = new byte[len];
+      rowBuffer.get(decimalBytes);
+      tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+    } else {
+      throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
     }
+    return tmpContent;
   }
 
   /**
@@ -327,9 +414,14 @@ public class SortStepRowHandler implements Serializable {
 
     // write no-dict & sort dim
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = sortTempRow.getNoDictSortDims()[idx];
-      outputStream.writeShort(bytes.length);
-      outputStream.write(bytes);
+      if (this.noDictSortColMapping[idx]) {
+        // write the original data to the stream
+        writeDataToStream(sortTempRow.getNoDictSortDims()[idx], outputStream, idx);
+      } else {
+        byte[] bytes = (byte[]) sortTempRow.getNoDictSortDims()[idx];
+        outputStream.writeShort(bytes.length);
+        outputStream.write(bytes);
+      }
     }
 
     // write packed no-sort dim & measure
@@ -359,9 +451,14 @@ public class SortStepRowHandler implements Serializable {
 
     // write no-dict & sort
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
-      outputStream.writeShort(bytes.length);
-      outputStream.write(bytes);
+      if (this.noDictSortColMapping[idx]) {
+        // write the original data to the stream
+        writeDataToStream(row[this.noDictSortDimIdx[idx]], outputStream, idx);
+      } else {
+        byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
+        outputStream.writeShort(bytes.length);
+        outputStream.write(bytes);
+      }
     }
 
     // pack no-sort
@@ -376,6 +473,46 @@ public class SortStepRowHandler implements Serializable {
   }
 
   /**
+   * Write the data to stream
+   *
+   * @param data
+   * @param outputStream
+   * @param idx
+   * @throws IOException
+   */
+  private void writeDataToStream(Object data, DataOutputStream outputStream, int idx)
+      throws IOException {
+    DataType dataType = noDictSortDataTypes[idx];
+    if (null == data) {
+      outputStream.writeBoolean(false);
+    } else {
+      outputStream.writeBoolean(true);
+      if (dataType == DataTypes.BOOLEAN) {
+        outputStream.writeBoolean((boolean) data);
+      } else if (dataType == DataTypes.BYTE) {
+        outputStream.writeByte((byte) data);
+      } else if (dataType == DataTypes.SHORT) {
+        outputStream.writeShort((short) data);
+      } else if (dataType == DataTypes.INT) {
+        outputStream.writeInt((int) data);
+      } else if (dataType == DataTypes.LONG) {
+        outputStream.writeLong((long) data);
+      } else if (dataType == DataTypes.DOUBLE) {
+        outputStream.writeDouble((double) data);
+      } else if (DataTypes.isDecimal(dataType)) {
+        BigDecimal val = (BigDecimal) data;
+        byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+        outputStream.writeShort(bigDecimalInBytes.length);
+        outputStream.write(bigDecimalInBytes);
+      } else if (dataType == DataTypes.FLOAT) {
+        outputStream.writeFloat((float) data);
+      } else if (dataType == DataTypes.BYTE_ARRAY) {
+        outputStream.writeUTF(data.toString());
+      }
+    }
+  }
+
+  /**
    * Read intermediate sort temp row from unsafe memory.
    * This method is used during merge sort phase for off-heap sort.
    *
@@ -430,9 +567,9 @@ public class SortStepRowHandler implements Serializable {
     int size = 0;
 
     int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-    byte[][] noDictSortDims =
-        new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
-            + this.complexDimCnt][];
+    Object[] noDictSortDims =
+        new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+            + this.complexDimCnt];
 
     // read dict & sort dim
     for (int idx = 0; idx < dictSortDimCnt; idx++) {
@@ -444,11 +581,24 @@ public class SortStepRowHandler implements Serializable {
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
       short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
       size += 2;
-      byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      size += length;
-      noDictSortDims[idx] = bytes;
+      if (this.noDictSortColMapping[idx]) {
+        // get the original data from the unsafe memory
+        if (0 == length) {
+          // if the length is 0, the the data is null
+          noDictSortDims[idx] = null;
+        } else {
+          Object data = CarbonUnsafeUtil
+              .getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length);
+          size += length;
+          noDictSortDims[idx] = data;
+        }
+      } else {
+        byte[] bytes = new byte[length];
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+        size += length;
+        noDictSortDims[idx] = bytes;
+      }
     }
 
     // read no-sort dims & measures
@@ -487,13 +637,26 @@ public class SortStepRowHandler implements Serializable {
     for (int idx = 0; idx < noDictSortDimCnt; idx++) {
       short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
       size += 2;
-      byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      size += length;
+      if (this.noDictSortColMapping[idx]) {
+        // get the original data from unsafe memory
+        if (0 == length) {
+          // if the length is 0, then the data is null
+          writeDataToStream(null, outputStream, idx);
+        } else {
+          Object data = CarbonUnsafeUtil
+              .getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length);
+          size += length;
+          writeDataToStream(data, outputStream, idx);
+        }
+      } else {
+        byte[] bytes = new byte[length];
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+        size += length;
 
-      outputStream.writeShort(length);
-      outputStream.write(bytes);
+        outputStream.writeShort(length);
+        outputStream.write(bytes);
+      }
     }
 
     // packed no-sort & measure
@@ -534,13 +697,31 @@ public class SortStepRowHandler implements Serializable {
 
     // write no-dict & sort
     for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
-      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
-      size += 2;
-      CarbonUnsafe.getUnsafe()
-          .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
-              bytes.length);
-      size += bytes.length;
+      if (this.noDictSortColMapping[idx]) {
+        Object data = row[this.noDictSortDimIdx[idx]];
+        if (null == data) {
+          // if the data is null, then write only the length as 0.
+          CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) 0);
+          size += 2;
+        } else {
+          int sizeInBytes = this.noDictSortDataTypes[idx].getSizeInBytes();
+          CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) sizeInBytes);
+          size += 2;
+          // put data to unsafe according to the data types
+          CarbonUnsafeUtil
+              .putDataToUnsafe(noDictSortDataTypes[idx], data, baseObject, address, size,
+                  sizeInBytes);
+          size += sizeInBytes;
+        }
+      } else {
+        byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
+        CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
+        size += 2;
+        CarbonUnsafe.getUnsafe()
+            .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
+                bytes.length);
+        size += bytes.length;
+      }
     }
 
     // convert pack no-sort
@@ -574,9 +755,15 @@ public class SortStepRowHandler implements Serializable {
     }
     // convert no-dict & no-sort
     for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
-      rowBuffer.putShort((short) bytes.length);
-      rowBuffer.put(bytes);
+      if (this.noDictNoSortColMapping[idx]) {
+        // put the original data to buffer
+        putDataToRowBuffer(this.noDictNoSortDataTypes[idx], row[this.noDictNoSortDimIdx[idx]],
+            rowBuffer);
+      } else {
+        byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
+        rowBuffer.putShort((short) bytes.length);
+        rowBuffer.put(bytes);
+      }
     }
     // convert varchar dims
     for (int idx = 0; idx < this.varcharDimCnt; idx++) {
@@ -592,37 +779,45 @@ public class SortStepRowHandler implements Serializable {
     }
 
     // convert measure
-    Object tmpValue;
-    DataType tmpDataType;
     for (int idx = 0; idx < this.measureCnt; idx++) {
-      tmpValue = row[this.measureIdx[idx]];
-      tmpDataType = this.dataTypes[idx];
-      if (null == tmpValue) {
-        rowBuffer.put((byte) 0);
-        continue;
-      }
-      rowBuffer.put((byte) 1);
-      if (DataTypes.BOOLEAN == tmpDataType) {
-        if ((boolean) tmpValue) {
-          rowBuffer.put((byte) 1);
-        } else {
-          rowBuffer.put((byte) 0);
-        }
-      } else if (DataTypes.SHORT == tmpDataType) {
-        rowBuffer.putShort((Short) tmpValue);
-      } else if (DataTypes.INT == tmpDataType) {
-        rowBuffer.putInt((Integer) tmpValue);
-      } else if (DataTypes.LONG == tmpDataType) {
-        rowBuffer.putLong((Long) tmpValue);
-      } else if (DataTypes.DOUBLE == tmpDataType) {
-        rowBuffer.putDouble((Double) tmpValue);
-      } else if (DataTypes.isDecimal(tmpDataType)) {
-        byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
-        rowBuffer.putShort((short) decimalBytes.length);
-        rowBuffer.put(decimalBytes);
+      putDataToRowBuffer(this.dataTypes[idx], row[this.measureIdx[idx]], rowBuffer);
+    }
+  }
+
+  /**
+   * Put the data to the row buffer
+   *
+   * @param tmpDataType
+   * @param tmpValue
+   * @param rowBuffer
+   */
+  private void putDataToRowBuffer(DataType tmpDataType, Object tmpValue, ByteBuffer rowBuffer) {
+    if (null == tmpValue) {
+      rowBuffer.put((byte) 0);
+      return;
+    }
+    rowBuffer.put((byte) 1);
+    if (DataTypes.BOOLEAN == tmpDataType) {
+      if ((boolean) tmpValue) {
+        rowBuffer.put((byte) 1);
       } else {
-        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+        rowBuffer.put((byte) 0);
       }
+    } else if (DataTypes.SHORT == tmpDataType) {
+      rowBuffer.putShort((Short) tmpValue);
+    } else if (DataTypes.INT == tmpDataType) {
+      rowBuffer.putInt((Integer) tmpValue);
+    } else if (DataTypes.LONG == tmpDataType) {
+      rowBuffer.putLong((Long) tmpValue);
+    } else if (DataTypes.DOUBLE == tmpDataType) {
+      rowBuffer.putDouble((Double) tmpValue);
+    } else if (DataTypes.isDecimal(tmpDataType)) {
+      byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
+      rowBuffer.putShort((short) decimalBytes.length);
+      rowBuffer.put(decimalBytes);
+    } else {
+      throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
index 8f29cee..b0109fa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -20,7 +20,11 @@ package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
 import java.util.Comparator;
 
 import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.CarbonUnsafeUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
 import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
@@ -52,6 +56,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     long rowA = rowL.address;
     long rowB = rowR.address;
     int sizeInDictPartA = 0;
+    int noDicSortIdx = 0;
 
     int sizeInNonDictPartA = 0;
     int sizeInDictPartB = 0;
@@ -60,25 +65,50 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
       if (isNoDictionary) {
         short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL,
             rowA + dictSizeInMemory + sizeInNonDictPartA);
-        byte[] byteArr1 = new byte[lengthA];
         sizeInNonDictPartA += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA,
-                byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
-        sizeInNonDictPartA += lengthA;
-
         short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR,
             rowB + dictSizeInMemory + sizeInNonDictPartB);
-        byte[] byteArr2 = new byte[lengthB];
         sizeInNonDictPartB += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB,
-                byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
-        sizeInNonDictPartB += lengthB;
+        DataType dataType = tableFieldStat.getNoDictDataType()[noDicSortIdx++];
+        if (DataTypeUtil.isPrimitiveColumn(dataType)) {
+          Object data1 = null;
+          if (0 != lengthA) {
+            data1 = CarbonUnsafeUtil
+                .getDataFromUnsafe(dataType, baseObjectL, rowA + dictSizeInMemory,
+                    sizeInNonDictPartA, lengthA);
+            sizeInNonDictPartA += lengthA;
+          }
+          Object data2 = null;
+          if (0 != lengthB) {
+            data2 = CarbonUnsafeUtil
+                .getDataFromUnsafe(dataType, baseObjectR, rowB + dictSizeInMemory,
+                    sizeInNonDictPartB, lengthB);
+            sizeInNonDictPartB += lengthB;
+          }
+          // use the data type based comparator for the no dictionary encoded columns
+          SerializableComparator comparator =
+              org.apache.carbondata.core.util.comparator.Comparator.getComparator(dataType);
+          int difference = comparator.compare(data1, data2);
+          if (difference != 0) {
+            return difference;
+          }
+        } else {
+          byte[] byteArr1 = new byte[lengthA];
+          CarbonUnsafe.getUnsafe()
+              .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA, byteArr1,
+                  CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
+          sizeInNonDictPartA += lengthA;
+
+          byte[] byteArr2 = new byte[lengthB];
+          CarbonUnsafe.getUnsafe()
+              .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB, byteArr2,
+                  CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
+          sizeInNonDictPartB += lengthB;
 
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
+          int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+          if (difference != 0) {
+            return difference;
+          }
         }
       } else {
         int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeInDictPartA);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 102b057..b805d37 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
@@ -43,6 +44,8 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
 
   private IntermediateSortTempRow currentRow;
 
+  private DataType[] noDictDataType;
+
   public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
       boolean[] noDictSortColumnMapping) {
     this.actualSize = merger.getEntryCount();
@@ -52,8 +55,10 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
     for (UnsafeCarbonRowPage rowPage: rowPages) {
       rowPage.setReadConvertedNoSortField();
     }
+    this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType();
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping);
+    this.comparator =
+        new IntermediateSortTempRowComparator(noDictSortColumnMapping, noDictDataType);
   }
 
   public boolean hasNext() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index 02ffd68..baa9e71 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -45,7 +45,8 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
     this.rowPage = rowPage;
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
     this.comparator = new IntermediateSortTempRowComparator(
-        rowPage.getTableFieldStat().getIsSortColNoDictFlags());
+        rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
+        rowPage.getTableFieldStat().getNoDictDataType());
     this.rowPage.setReadConvertedNoSortField();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 7c3c056..a991d4c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -109,7 +109,8 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     this.tableFieldStat = new TableFieldStat(parameters);
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
     this.executorService = Executors.newFixedThreadPool(1);
-    comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn());
+    comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
+        parameters.getNoDictDataType());
     this.convertNoSortFields = convertNoSortFields;
     initialize();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index ac13d24..7683bbc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -273,19 +273,19 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     int dictIndex = 0;
     int nonDicIndex = 0;
     int[] dim = new int[this.dimensionCount];
-    byte[][] nonDicArray = new byte[this.noDictWithComplextCount][];
+    Object[] nonDicArray = new Object[this.noDictWithComplextCount];
     // read dimension values
     int dimCount = 0;
     for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
       if (isNoDictionaryDimensionColumn[dimCount]) {
-        nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
+        nonDicArray[nonDicIndex++] = row.getObject(dimCount);
       } else {
         dim[dictIndex++] = (int) row.getObject(dimCount);
       }
     }
 
     for (; dimCount < this.dimensionWithComplexCount; dimCount++) {
-      nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
+      nonDicArray[nonDicIndex++] = row.getObject(dimCount);
     }
 
     Object[] measures = new Object[measureCount];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index e3bc97f..ae9ec3d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.processing.loading.partition.impl.RangePartitionerI
 import org.apache.carbondata.processing.loading.partition.impl.RawRowComparator;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -134,12 +135,16 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     // sort the range bounds (sort in carbon is a little different from what we think)
     Arrays.sort(convertedSortColumnRanges,
         new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
-            sortColumnRangeInfo.getIsSortColumnNoDict()));
+            sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
+            .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+                configuration.getTableIdentifier().getTableName())));
 
     // range partitioner to dispatch rows by sort columns
     this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges,
         new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
-            sortColumnRangeInfo.getIsSortColumnNoDict()));
+            sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
+            .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+                configuration.getTableIdentifier().getTableName())));
   }
 
   // only convert sort column fields

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index ce8b62f..b921675 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -266,8 +266,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       Object[] newData = new Object[data.length];
       for (int i = 0; i < data.length; i++) {
         if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
-          newData[i] = DataTypeUtil
-              .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+          if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) {
+            // keep the no dictionary measure column as original data
+            newData[i] = data[orderOfData[i]];
+          } else {
+            newData[i] = DataTypeUtil
+                .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+          }
         } else {
           // if this is a complex column then recursively comver the data into Byte Array.
           if (dataTypes[i].isComplexType()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 0fc229a..1aa6da8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -91,6 +91,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * boolean mapping for no dictionary columns in schema
    */
   private boolean[] noDictionaryColMapping;
+
+  private boolean[] sortColumnMapping;
   /**
    * boolean mapping for long string dimension
    */
@@ -275,7 +277,15 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
         preparedRow[i] = dictionaryValues[dictionaryIndex++];
       } else {
         // no dictionary dims
-        preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
+        byte[] noDictionaryKeyByIndex = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
+        if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
+          // no dictionary measure columns are expected as original data
+          preparedRow[i] = DataTypeUtil
+              .getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeyByIndex,
+                  dims.getDataType());
+        } else {
+          preparedRow[i] = noDictionaryKeyByIndex;
+        }
       }
     }
     // fill all the measures
@@ -357,6 +367,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     measureCount = carbonTable.getMeasureByTableName(tableName).size();
     List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
     noDictionaryColMapping = new boolean[dimensions.size()];
+    sortColumnMapping = new boolean[dimensions.size()];
     isVarcharDimMapping = new boolean[dimensions.size()];
     int i = 0;
     for (CarbonDimension dimension : dimensions) {
@@ -364,6 +375,9 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
         i++;
         continue;
       }
+      if (dimension.isSortColumn()) {
+        sortColumnMapping[i] = true;
+      }
       noDictionaryColMapping[i] = true;
       if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) {
         isVarcharDimMapping[i] = true;
@@ -395,8 +409,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     return SortParameters
         .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
             dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
-            noDictionaryCount, segmentId,
-            carbonLoadModel.getTaskNo(), noDictionaryColMapping, isVarcharDimMapping, true);
+            noDictionaryCount, segmentId, carbonLoadModel.getTaskNo(), noDictionaryColMapping,
+            sortColumnMapping, isVarcharDimMapping, true);
   }
 
   /**
@@ -404,14 +418,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * sort temp files
    */
   private void initializeFinalThreadMergerForMergeSort() {
-    boolean[] noDictionarySortColumnMapping = null;
-    if (noDictionaryColMapping.length == this.segmentProperties.getNumberOfSortColumns()) {
-      noDictionarySortColumnMapping = noDictionaryColMapping;
-    } else {
-      noDictionarySortColumnMapping = new boolean[this.segmentProperties.getNumberOfSortColumns()];
-      System.arraycopy(noDictionaryColMapping, 0,
-          noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length);
-    }
+    boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
+        .getNoDictSortColMapping(carbonTable.getDatabaseName(), carbonTable.getTableName());
     sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
     String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index b877d52..2911c05 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -53,6 +54,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
   private SegmentProperties segprop;
   private CarbonLoadModel loadModel;
   private PartitionSpec partitionSpec;
+
+  CarbonColumn[] noDicAndComplexColumns;
   /**
    * record holder heap
    */
@@ -86,6 +89,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
     setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
     carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
+    this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }
 
@@ -200,7 +204,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
    * @throws SliceMergerException
    */
   private void addRow(Object[] carbonTuple) throws SliceMergerException {
-    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop);
+    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop, noDicAndComplexColumns);
     try {
       this.dataHandler.addDataToStore(row);
     } catch (CarbonDataWriterException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 2dc79a3..00fbc7a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
@@ -37,6 +38,8 @@ public class RowResultProcessor {
   private CarbonFactHandler dataHandler;
   private SegmentProperties segmentProperties;
 
+  private CarbonColumn[] noDicAndComplexColumns;
+
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(RowResultProcessor.class.getName());
 
@@ -59,6 +62,7 @@ public class RowResultProcessor {
     //Note: set compaction flow just to convert decimal type
     carbonFactDataHandlerModel.setCompactionFlow(true);
     carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
+    noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }
 
@@ -97,7 +101,8 @@ public class RowResultProcessor {
   }
 
   private void addRow(Object[] carbonTuple) throws CarbonDataWriterException {
-    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
+    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties,
+        noDicAndComplexColumns);
     try {
       this.dataHandler.addDataToStore(row);
     } catch (CarbonDataWriterException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
index 9b6d1e8..54fa99e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
@@ -19,7 +19,10 @@ package org.apache.carbondata.processing.sort.sortdata;
 
 import java.util.Comparator;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 
 /**
@@ -31,11 +34,15 @@ public class IntermediateSortTempRowComparator implements Comparator<Intermediat
    */
   private boolean[] isSortColumnNoDictionary;
 
+  private DataType[] noDicSortDataTypes;
+
   /**
    * @param isSortColumnNoDictionary isSortColumnNoDictionary
    */
-  public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) {
+  public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary,
+      DataType[] noDicSortDataTypes) {
     this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+    this.noDicSortDataTypes = noDicSortDataTypes;
   }
 
   /**
@@ -45,18 +52,31 @@ public class IntermediateSortTempRowComparator implements Comparator<Intermediat
     int diff = 0;
     int dictIndex = 0;
     int nonDictIndex = 0;
+    int noDicTypeIdx = 0;
 
     for (boolean isNoDictionary : isSortColumnNoDictionary) {
 
       if (isNoDictionary) {
-        byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex];
-        byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex];
-        nonDictIndex++;
+        if (DataTypeUtil.isPrimitiveColumn(noDicSortDataTypes[noDicTypeIdx])) {
+          // use data types based comparator for the no dictionary measure columns
+          SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator
+              .getComparator(noDicSortDataTypes[noDicTypeIdx]);
+          int difference = comparator.compare(rowA.getNoDictSortDims()[nonDictIndex],
+              rowB.getNoDictSortDims()[nonDictIndex]);
+          if (difference != 0) {
+            return difference;
+          }
+          noDicTypeIdx++;
+        } else {
+          byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
+          byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
 
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
+          int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+          if (difference != 0) {
+            return difference;
+          }
         }
+        nonDictIndex++;
       } else {
         int dimFieldA = rowA.getDictSortDims()[dictIndex];
         int dimFieldB = rowB.getDictSortDims()[dictIndex];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
index f47ecc7..4dff644 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -20,7 +20,10 @@ package org.apache.carbondata.processing.sort.sortdata;
 import java.io.Serializable;
 import java.util.Comparator;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public class NewRowComparator implements Comparator<Object[]>, Serializable {
   private static final long serialVersionUID = -1739874611112709436L;
@@ -28,13 +31,20 @@ public class NewRowComparator implements Comparator<Object[]>, Serializable {
   /**
    * mapping of dictionary dimensions and no dictionary of sort_column.
    */
-  private boolean[] noDictionarySortColumnMaping;
+  private boolean[] noDicDimColMapping;
+
+  private DataType[] noDicDataTypes;
+
+  private boolean[] noDicSortColumnMapping;
 
   /**
-   * @param noDictionarySortColumnMaping
+   * @param noDicDimColMapping
    */
-  public NewRowComparator(boolean[] noDictionarySortColumnMaping) {
-    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
+  public NewRowComparator(boolean[] noDicDimColMapping, boolean[] noDicSortColumnMapping,
+      DataType[] noDicDataTypes) {
+    this.noDicDimColMapping = noDicDimColMapping;
+    this.noDicSortColumnMapping = noDicSortColumnMapping;
+    this.noDicDataTypes = noDicDataTypes;
   }
 
   /**
@@ -43,15 +53,31 @@ public class NewRowComparator implements Comparator<Object[]>, Serializable {
   public int compare(Object[] rowA, Object[] rowB) {
     int diff = 0;
     int index = 0;
+    int dataTypeIdx = 0;
+    int noDicSortIdx = 0;
 
-    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-      if (isNoDictionary) {
-        byte[] byteArr1 = (byte[]) rowA[index];
-        byte[] byteArr2 = (byte[]) rowB[index];
+    for (int i = 0; i < noDicDimColMapping.length; i++) {
+      if (noDicDimColMapping[i]) {
+        if (noDicSortColumnMapping[noDicSortIdx++]) {
+          if (DataTypeUtil.isPrimitiveColumn(noDicDataTypes[dataTypeIdx])) {
+            // use data types based comparator for the no dictionary measure columns
+            SerializableComparator comparator =
+                org.apache.carbondata.core.util.comparator.Comparator
+                    .getComparator(noDicDataTypes[dataTypeIdx]);
+            int difference = comparator.compare(rowA[index], rowB[index]);
+            if (difference != 0) {
+              return difference;
+            }
+            dataTypeIdx++;
+          } else {
+            byte[] byteArr1 = (byte[]) rowA[index];
+            byte[] byteArr2 = (byte[]) rowB[index];
 
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
+            int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+            if (difference != 0) {
+              return difference;
+            }
+          }
         }
       } else {
         int dimFieldA = (int) rowA[index];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index a5caf7b..730c729 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -203,7 +203,9 @@ public class SortDataRows {
       toSort = new Object[entryCount][];
       System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
       if (parameters.getNumberOfNoDictSortColumns() > 0) {
-        Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn()));
+        Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn(),
+            parameters.getNoDictionarySortColumn(),
+            parameters.getNoDictDataType()));
       } else {
         Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
       }
@@ -315,7 +317,8 @@ public class SortDataRows {
         long startTime = System.currentTimeMillis();
         if (parameters.getNumberOfNoDictSortColumns() > 0) {
           Arrays.sort(recordHolderArray,
-              new NewRowComparator(parameters.getNoDictionarySortColumn()));
+              new NewRowComparator(parameters.getNoDictionaryDimnesionColumn(),
+                  parameters.getNoDictionarySortColumn(), parameters.getNoDictDataType()));
         } else {
           Arrays.sort(recordHolderArray,
               new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));