You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/09/18 13:38:18 UTC

[4/4] carbondata git commit: [CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types

[CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types

Loading configurations and settings
(1) Parse data as like that of measure, so change in FieldEncoderFactory to take up measure flow
(2) While creating loading configurations, no dictionary, sort columns should be taken care in all the needed flows

Sort rows preparation
(1) Prepare the row to be sorted with original data for no dictionary columns
(2) Use data type based comparators for the no dictionary sort columns in all the flows like Intermediate Sort, Final sort, Unsafe sort
(3) Handle read write of row with no dictionary primitive data types to intermediate files and in the final file merger, as we will be reading and writing as original data
(4) Get the no dictionary sort data types from the load configurations what we set in LOAD step

Adding to Column page and apply adaptive encoding
(1) Add the no dictionary primitive datatypes data as original data
(2) Apply adaptive encoding to the page
(3) Reuse the adaptive encoding techniques existing for measure column

Writing inverted index to adaptive encoded page
(1) Prepare in the inverted inverted list based on the datatype based comparison
(2) Apply RLE on the inverted index
(3) Write the inverted index to the encoded page

Create decoder while querying
(1) Create proper decoder for the no dictionary column pages
(2) Uncompress the column page and also the inverted index

Filter flow changes
(1) FilterValues will be in bytes, so convert the data to bytes for comparison
(2) Change the isScanRequired to compare min/max values based on the data type

Fill output row in case of queries
(1) Change the noDictionaryKeys to Object, now it can be datatypes based data for no dictionary primitive data types

Bloom filter changes
(1) Change bloom filter load
(2) While rebuilding the data map, the load expects the data to original data. Therefore a conversion is used
(3) Fill the no dictionary primitive data as original data

Compaction Changes
Compaction will get the rows from the result collectors. But the result collectors will give bytes as no dictionary columns.
So a conversion is needed to convert the bytes to original data based on the data type.

This closes #2654


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c8f70630
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c8f70630
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c8f70630

Branch: refs/heads/master
Commit: c8f7063048115d161de539cf277cc1ccb015159b
Parents: 61fcdf2
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Aug 22 12:45:44 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Sep 18 19:12:56 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datastore/TableSpec.java    |  17 +
 ...mpressedDimensionChunkFileBasedReaderV3.java |  42 +-
 .../chunk/store/ColumnPageWrapper.java          | 106 ++++-
 ...feVariableLengthDimensionDataChunkStore.java |  28 +-
 .../datastore/columnar/BlockIndexerStorage.java | 104 +++++
 .../BlockIndexerStorageForNoDictionary.java     | 116 ++++++
 ...ndexerStorageForNoInvertedIndexForShort.java |  17 +-
 .../columnar/BlockIndexerStorageForShort.java   |  71 +---
 .../ColumnWithRowIdForNoDictionary.java         |  72 ++++
 .../core/datastore/columnar/IndexStorage.java   |  35 --
 .../page/encoding/ColumnPageEncoder.java        |  48 ++-
 .../page/encoding/DefaultEncodingFactory.java   |  70 +++-
 .../page/encoding/EncodingFactory.java          |  18 +-
 .../page/encoding/adaptive/AdaptiveCodec.java   | 195 ++++++++-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |  31 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |  30 +-
 .../adaptive/AdaptiveFloatingCodec.java         |  30 +-
 .../adaptive/AdaptiveIntegralCodec.java         |  30 +-
 .../legacy/ComplexDimensionIndexCodec.java      |   4 +-
 .../legacy/DictDimensionIndexCodec.java         |   4 +-
 .../legacy/DirectDictDimensionIndexCodec.java   |   4 +-
 .../legacy/HighCardDictDimensionIndexCodec.java |   4 +-
 .../dimension/legacy/IndexStorageEncoder.java   |   8 +-
 .../core/datastore/page/key/TablePageKey.java   |   3 +-
 .../page/statistics/TablePageStatistics.java    |  14 +-
 .../core/datastore/row/WriteStepRowUtil.java    |  28 +-
 .../core/scan/executor/util/QueryUtil.java      |  36 ++
 .../carbondata/core/scan/filter/FilterUtil.java |  42 +-
 .../executer/ExcludeFilterExecuterImpl.java     |   2 +-
 .../executer/IncludeFilterExecuterImpl.java     |  58 ++-
 .../executer/RangeValueFilterExecuterImpl.java  |  39 +-
 .../executer/RestructureEvaluatorImpl.java      |   4 +-
 .../executer/RowLevelFilterExecuterImpl.java    |  10 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  55 ++-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  55 ++-
 ...velRangeLessThanEqualFilterExecuterImpl.java |  53 ++-
 ...RowLevelRangeLessThanFilterExecuterImpl.java |  53 ++-
 .../RowLevelRangeFilterResolverImpl.java        |   2 +-
 .../core/scan/result/BlockletScannedResult.java |   5 +-
 .../result/impl/FilterQueryScannedResult.java   |   9 +-
 .../carbondata/core/util/CarbonUnsafeUtil.java  |  95 +++++
 .../apache/carbondata/core/util/CarbonUtil.java |  22 +
 .../carbondata/core/util/DataTypeUtil.java      |  58 ++-
 .../carbondata/core/util/NonDictionaryUtil.java |   2 +-
 .../page/encoding/TestEncodingFactory.java      |  17 +-
 .../carbondata/core/util/DataTypeUtilTest.java  |   6 +-
 .../core/util/RangeFilterProcessorTest.java     |  23 ++
 .../bloom/AbstractBloomDataMapWriter.java       |   4 +-
 .../datamap/bloom/BloomCoarseGrainDataMap.java  |  15 +-
 .../datamap/bloom/BloomDataMapBuilder.java      |   9 +-
 .../datamap/bloom/BloomDataMapWriter.java       |  10 +-
 .../hadoop/testutil/StoreCreator.java           |  39 +-
 .../load/DataLoadProcessBuilderOnSpark.scala    |   4 +-
 .../datasource/SparkCarbonDataSourceTest.scala  |   2 +-
 .../datamap/IndexDataMapRebuildRDD.scala        |  13 +-
 .../CarbonGetTableDetailComandTestCase.scala    |   6 +-
 .../loading/CarbonDataLoadConfiguration.java    |  40 ++
 .../converter/impl/FieldEncoderFactory.java     |   6 +
 .../impl/MeasureFieldConverterImpl.java         |  40 +-
 .../partition/impl/RawRowComparator.java        |  30 +-
 .../loading/row/IntermediateSortTempRow.java    |   8 +-
 .../loading/sort/SortStepRowHandler.java        | 411 ++++++++++++++-----
 .../unsafe/comparator/UnsafeRowComparator.java  |  58 ++-
 .../holder/UnsafeFinalMergePageHolder.java      |   7 +-
 .../unsafe/holder/UnsafeInmemoryHolder.java     |   3 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |   3 +-
 .../CarbonRowDataWriterProcessorStepImpl.java   |   6 +-
 .../steps/DataConverterProcessorStepImpl.java   |   9 +-
 .../InputProcessorStepWithNoConverterImpl.java  |   9 +-
 .../merger/CompactionResultSortProcessor.java   |  30 +-
 .../merger/RowResultMergerProcessor.java        |   6 +-
 .../partition/spliter/RowResultProcessor.java   |   7 +-
 .../IntermediateSortTempRowComparator.java      |  34 +-
 .../sort/sortdata/NewRowComparator.java         |  48 ++-
 .../processing/sort/sortdata/SortDataRows.java  |   7 +-
 .../sort/sortdata/SortParameters.java           |  94 ++++-
 .../sort/sortdata/SortTempFileChunkHolder.java  |   2 +-
 .../sort/sortdata/TableFieldStat.java           |  34 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  13 +-
 .../store/CarbonFactDataHandlerModel.java       |  25 ++
 .../carbondata/processing/store/TablePage.java  |  62 ++-
 .../util/CarbonDataProcessorUtil.java           |  80 +++-
 82 files changed, 2328 insertions(+), 621 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index bded430..a26d6ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -104,6 +104,23 @@ public class TableSpec {
     }
   }
 
+  /**
+   * No dictionary and complex dimensions of the table
+   *
+   * @return
+   */
+  public DimensionSpec[] getNoDictAndComplexDimensions() {
+    List<DimensionSpec> noDictAndComplexDimensions = new ArrayList<>();
+    for (int i = 0; i < dimensionSpec.length; i++) {
+      if (dimensionSpec[i].getColumnType() == ColumnType.PLAIN_VALUE
+          || dimensionSpec[i].getColumnType() == ColumnType.COMPLEX_PRIMITIVE
+          || dimensionSpec[i].getColumnType() == ColumnType.COMPLEX) {
+        noDictAndComplexDimensions.add(dimensionSpec[i]);
+      }
+    }
+    return noDictAndComplexDimensions.toArray(new DimensionSpec[noDictAndComplexDimensions.size()]);
+  }
+
   public DimensionSpec getDimensionSpec(int dimensionIndex) {
     return dimensionSpec[dimensionIndex];
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 486cc2d..b96e52e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -237,41 +237,39 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
         .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
   }
 
-  private boolean isEncodedWithMeta(DataChunk2 pageMetadata) {
-    List<Encoding> encodings = pageMetadata.getEncoders();
-    if (encodings != null && encodings.size() == 1) {
-      Encoding encoding = encodings.get(0);
-      switch (encoding) {
-        case DIRECT_COMPRESS:
-        case DIRECT_STRING:
-        case ADAPTIVE_INTEGRAL:
-        case ADAPTIVE_DELTA_INTEGRAL:
-        case ADAPTIVE_FLOATING:
-        case ADAPTIVE_DELTA_FLOATING:
-          return true;
-      }
-    }
-    return false;
-  }
-
   protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
       ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
       throws IOException, MemoryException {
-    if (isEncodedWithMeta(pageMetadata)) {
+    List<Encoding> encodings = pageMetadata.getEncoders();
+    if (CarbonUtil.isEncodedWithMeta(encodings)) {
       ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
           null != rawColumnPage.getLocalDictionary());
       decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
-      return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(),
-          isEncodedWithAdaptiveMeta(pageMetadata));
+      int[] invertedIndexes = new int[0];
+      int[] invertedIndexesReverse = new int[0];
+      // in case of no dictionary measure data types, if it is included in sort columns
+      // then inverted index to be uncompressed
+      if (encodings.contains(Encoding.INVERTED_INDEX)) {
+        offset += pageMetadata.data_page_length;
+        if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
+          invertedIndexes = CarbonUtil
+              .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
+          // get the reverse index
+          invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+        }
+      }
+      return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
+          invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata),
+          CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX));
     } else {
       // following code is for backward compatibility
       return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
     }
   }
 
-  private boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
+  public boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
     List<Encoding> encodings = pageMetadata.getEncoders();
-    if (encodings != null && encodings.size() == 1) {
+    if (encodings != null && !encodings.isEmpty()) {
       Encoding encoding = encodings.get(0);
       switch (encoding) {
         case ADAPTIVE_INTEGRAL:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
index 65991a5..176a3e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -24,10 +24,13 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 
 public class ColumnPageWrapper implements DimensionColumnPage {
@@ -36,14 +39,23 @@ public class ColumnPageWrapper implements DimensionColumnPage {
 
   private CarbonDictionary localDictionary;
 
-  private boolean isAdaptiveComplexPrimitivePage;
+  private boolean isAdaptivePrimitivePage;
+
+  private int[] invertedIndex;
+
+  private int[] invertedReverseIndex;
+
+  private boolean isExplicitSorted;
 
   public ColumnPageWrapper(ColumnPage columnPage, CarbonDictionary localDictionary,
-      boolean isAdaptiveComplexPrimitivePage) {
+      int[] invertedIndex, int[] invertedReverseIndex, boolean isAdaptivePrimitivePage,
+      boolean isExplicitSorted) {
     this.columnPage = columnPage;
     this.localDictionary = localDictionary;
-    this.isAdaptiveComplexPrimitivePage = isAdaptiveComplexPrimitivePage;
-
+    this.invertedIndex = invertedIndex;
+    this.invertedReverseIndex = invertedReverseIndex;
+    this.isAdaptivePrimitivePage = isAdaptivePrimitivePage;
+    this.isExplicitSorted = isExplicitSorted;
   }
 
   @Override
@@ -58,26 +70,79 @@ public class ColumnPageWrapper implements DimensionColumnPage {
 
   @Override
   public int fillVector(ColumnVectorInfo[] vectorInfo, int chunkIndex) {
-    throw new UnsupportedOperationException("internal error");
+    ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    for (int i = offset; i < len; i++) {
+      fillRow(i, vector, vectorOffset++);
+    }
+    return chunkIndex + 1;
+  }
+
+  /**
+   * Fill the data to the vector
+   *
+   * @param rowId
+   * @param vector
+   * @param vectorRow
+   */
+  private void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
+    if (columnPage.getNullBits().get(rowId)
+        && columnPage.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+      // if this row is null, return default null represent in byte array
+      byte[] value = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      QueryUtil.putDataToVector(vector, value, vectorRow, value.length);
+    } else if (columnPage.getNullBits().get(rowId)) {
+      // if this row is null, return default null represent in byte array
+      byte[] value = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      QueryUtil.putDataToVector(vector, value, vectorRow, value.length);
+    } else {
+      if (isExplicitSorted) {
+        rowId = invertedReverseIndex[rowId];
+      }
+      byte[] value = getChunkData(rowId, true);
+      int length = value.length;
+      QueryUtil.putDataToVector(vector, value, vectorRow, length);
+    }
   }
 
   @Override
   public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int chunkIndex) {
-    throw new UnsupportedOperationException("internal error");
+    ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    for (int i = offset; i < len; i++) {
+      fillRow(filteredRowId[i], vector, vectorOffset++);
+    }
+    return chunkIndex + 1;
   }
 
   @Override public byte[] getChunkData(int rowId) {
+    return getChunkData(rowId, false);
+  }
+
+  private byte[] getChunkData(int rowId, boolean isRowIdChanged) {
     ColumnType columnType = columnPage.getColumnSpec().getColumnType();
     DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType();
     DataType targetDataType = columnPage.getDataType();
     if (null != localDictionary) {
       return localDictionary
           .getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 0, 3));
-    } else if (columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptiveComplexPrimitive()) {
-      if (columnPage.getNullBits().get(rowId)) {
+    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptivePrimitive()) || (
+        columnType == ColumnType.PLAIN_VALUE && DataTypeUtil.isPrimitiveColumn(srcDataType))) {
+      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)
+          && columnType == ColumnType.COMPLEX_PRIMITIVE) {
         // if this row is null, return default null represent in byte array
         return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
       }
+      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
+        // if this row is null, return default null represent in byte array
+        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      }
       if (srcDataType == DataTypes.DOUBLE || srcDataType == DataTypes.FLOAT) {
         double doubleData = columnPage.getDouble(rowId);
         if (srcDataType == DataTypes.FLOAT) {
@@ -118,15 +183,20 @@ public class ColumnPageWrapper implements DimensionColumnPage {
       } else {
         throw new RuntimeException("unsupported type: " + targetDataType);
       }
-    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE) && !this.isAdaptiveComplexPrimitive()) {
+    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && !this.isAdaptivePrimitive())) {
+      if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
+        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      }
       if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN)) {
         byte[] out = new byte[1];
         out[0] = (columnPage.getByte(rowId));
-        return out;
+        return ByteUtil.toBytes(ByteUtil.toBoolean(out));
       } else if (srcDataType == DataTypes.BYTE_ARRAY) {
         return columnPage.getBytes(rowId);
-      }  else if (srcDataType == DataTypes.DOUBLE) {
+      } else if (srcDataType == DataTypes.DOUBLE) {
         return ByteUtil.toXorBytes(columnPage.getDouble(rowId));
+      } else if (srcDataType == targetDataType) {
+        return columnPage.getBytes(rowId);
       } else {
         throw new RuntimeException("unsupported type: " + targetDataType);
       }
@@ -135,15 +205,14 @@ public class ColumnPageWrapper implements DimensionColumnPage {
     }
   }
 
-
   @Override
   public int getInvertedIndex(int rowId) {
-    throw new UnsupportedOperationException("internal error");
+    return invertedIndex[rowId];
   }
 
   @Override
   public int getInvertedReverseIndex(int rowId) {
-    throw new UnsupportedOperationException("internal error");
+    return invertedReverseIndex[rowId];
   }
 
   @Override
@@ -153,12 +222,13 @@ public class ColumnPageWrapper implements DimensionColumnPage {
 
   @Override
   public boolean isExplicitSorted() {
-    return false;
+    return isExplicitSorted;
   }
 
   @Override
   public int compareTo(int rowId, byte[] compareValue) {
-    throw new UnsupportedOperationException("internal error");
+    byte[] chunkData = this.getChunkData((int) rowId);
+    return ByteUtil.UnsafeComparer.INSTANCE.compareTo(chunkData, compareValue);
   }
 
   @Override
@@ -169,8 +239,8 @@ public class ColumnPageWrapper implements DimensionColumnPage {
     }
   }
 
-  public boolean isAdaptiveComplexPrimitive() {
-    return isAdaptiveComplexPrimitivePage;
+  public boolean isAdaptivePrimitive() {
+    return isAdaptivePrimitivePage;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
index 954cab2..15217b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
@@ -21,11 +21,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * Below class is responsible to store variable length dimension data chunk in
@@ -236,28 +233,7 @@ public abstract class UnsafeVariableLengthDimensionDataChunkStore
     }
     // get the row from unsafe
     fillRowInternal(length, value, currentDataOffset);
-    DataType dt = vector.getType();
-    if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
-        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
-            CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, value, 0, length)) {
-      vector.putNull(vectorRow);
-    } else {
-      if (dt == DataTypes.STRING) {
-        vector.putBytes(vectorRow, 0, length, value);
-      } else if (dt == DataTypes.BOOLEAN) {
-        vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
-      } else if (dt == DataTypes.SHORT) {
-        vector.putShort(vectorRow, ByteUtil.toXorShort(value, 0, length));
-      } else if (dt == DataTypes.INT) {
-        vector.putInt(vectorRow, ByteUtil.toXorInt(value, 0, length));
-      } else if (dt == DataTypes.LONG) {
-        vector.putLong(vectorRow,
-            DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0,
-                length));
-      } else if (dt == DataTypes.TIMESTAMP) {
-        vector.putLong(vectorRow, ByteUtil.toXorLong(value, 0, length) * 1000L);
-      }
-    }
+    QueryUtil.putDataToVector(vector, value, vectorRow, length);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
new file mode 100644
index 0000000..6f3f139
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public abstract class BlockIndexerStorage<T> {
+
+  public abstract short[] getRowIdPage();
+
+  public abstract int getRowIdPageLengthInBytes();
+
+  public abstract short[] getRowIdRlePage();
+
+  public abstract int getRowIdRlePageLengthInBytes();
+
+  public abstract T getDataPage();
+
+  public abstract short[] getDataRlePage();
+
+  public abstract int getDataRlePageLengthInBytes();
+
+  /**
+   * It compresses depends up on the sequence numbers.
+   * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
+   * first array the start and end of sequential numbers and second array
+   * keeps the indexes of where sequential numbers starts. If there is no
+   * sequential numbers then the same array it returns with empty second
+   * array.
+   *
+   * @param rowIds
+   */
+  protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds, short[] rowIdPage,
+      short[] rowIdRlePage) {
+    List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    int k = 0;
+    int i = 1;
+    for (; i < rowIds.length; i++) {
+      if (rowIds[i] - rowIds[i - 1] == 1) {
+        k++;
+      } else {
+        if (k > 0) {
+          map.add(((short) list.size()));
+          list.add(rowIds[i - k - 1]);
+          list.add(rowIds[i - 1]);
+        } else {
+          list.add(rowIds[i - 1]);
+        }
+        k = 0;
+      }
+    }
+    if (k > 0) {
+      map.add(((short) list.size()));
+      list.add(rowIds[i - k - 1]);
+      list.add(rowIds[i - 1]);
+    } else {
+      list.add(rowIds[i - 1]);
+    }
+    int compressionPercentage = (((list.size() + map.size()) * 100) / rowIds.length);
+    if (compressionPercentage > 70) {
+      rowIdPage = rowIds;
+    } else {
+      rowIdPage = convertToArray(list);
+    }
+    if (rowIds.length == rowIdPage.length) {
+      rowIdRlePage = new short[0];
+    } else {
+      rowIdRlePage = convertToArray(map);
+    }
+    Map<String, short[]> rowIdAndRowRleIdPages = new HashMap<>(2);
+    rowIdAndRowRleIdPages.put("rowIdPage", rowIdPage);
+    rowIdAndRowRleIdPages.put("rowRlePage", rowIdRlePage);
+    return rowIdAndRowRleIdPages;
+  }
+
+  protected short[] convertToArray(List<Short> list) {
+    short[] shortArray = new short[list.size()];
+    for (int i = 0; i < shortArray.length; i++) {
+      shortArray[i] = list.get(i);
+    }
+    return shortArray;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
new file mode 100644
index 0000000..b3e25d3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+public class BlockIndexerStorageForNoDictionary extends BlockIndexerStorage<Object[]> {
+
+  private short[] rowIdPage;
+
+  private short[] rowIdRlePage;
+
+  private Object[] dataPage;
+
+  private DataType dataType;
+
+  public BlockIndexerStorageForNoDictionary(Object[] dataPage, DataType dataType,
+      boolean isSortRequired) {
+    this.dataType = dataType;
+    ColumnWithRowIdForNoDictionary<Short>[] dataWithRowId = createColumnWithRowId(dataPage);
+    if (isSortRequired) {
+      Arrays.sort(dataWithRowId);
+    }
+    short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
+    Map<String, short[]> rowIdAndRleRowIdPages =
+        rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+    rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
+    rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
+  }
+
+  /**
+   * Create an object with each column array and respective rowId
+   *
+   * @return
+   */
+  private ColumnWithRowIdForNoDictionary<Short>[] createColumnWithRowId(Object[] dataPage) {
+    ColumnWithRowIdForNoDictionary<Short>[] columnWithIndexs =
+        new ColumnWithRowIdForNoDictionary[dataPage.length];
+    for (short i = 0; i < columnWithIndexs.length; i++) {
+      columnWithIndexs[i] = new ColumnWithRowIdForNoDictionary<>(dataPage[i], i, dataType);
+    }
+    return columnWithIndexs;
+  }
+
+  private short[] extractDataAndReturnRowId(ColumnWithRowIdForNoDictionary<Short>[] dataWithRowId,
+      Object[] dataPage) {
+    short[] indexes = new short[dataWithRowId.length];
+    for (int i = 0; i < indexes.length; i++) {
+      indexes[i] = dataWithRowId[i].getIndex();
+      dataPage[i] = dataWithRowId[i].getColumn();
+    }
+    this.dataPage = dataPage;
+    return indexes;
+  }
+
+  /**
+   * @return the rowIdPage
+   */
+  @Override
+  public short[] getRowIdPage() {
+    return rowIdPage;
+  }
+
+  @Override
+  public int getRowIdPageLengthInBytes() {
+    if (rowIdPage != null) {
+      return rowIdPage.length * 2;
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public short[] getRowIdRlePage() {
+    return rowIdRlePage;
+  }
+
+  @Override
+  public int getRowIdRlePageLengthInBytes() {
+    if (rowIdRlePage != null) {
+      return rowIdRlePage.length * 2;
+    } else {
+      return 0;
+    }
+  }
+
+  @Override public Object[] getDataPage() {
+    return dataPage;
+  }
+
+  @Override public short[] getDataRlePage() {
+    return new short[0];
+  }
+
+  @Override public int getDataRlePageLengthInBytes() {
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
index bbb3434..66fefe0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.util.ByteUtil;
 /**
  * Below class will be used to for no inverted index
  */
-public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStorage<short[]> {
+public class BlockIndexerStorageForNoInvertedIndexForShort extends BlockIndexerStorage<byte[][]> {
 
   /**
    * column data
@@ -78,14 +78,6 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     }
   }
 
-  private short[] convertToArray(List<Short> list) {
-    short[] shortArray = new short[list.size()];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i);
-    }
-    return shortArray;
-  }
-
   private byte[][] convertToDataPage(List<byte[]> list) {
     byte[][] shortArray = new byte[list.size()][];
     for (int i = 0; i < shortArray.length; i++) {
@@ -98,7 +90,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     return dataRlePage;
   }
 
-  @Override public int getDataRlePageLengthInBytes() {
+  public int getDataRlePageLengthInBytes() {
     if (dataRlePage != null) {
       return dataRlePage.length * 2;
     } else {
@@ -115,7 +107,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     return new short[0];
   }
 
-  @Override public int getRowIdPageLengthInBytes() {
+  public int getRowIdPageLengthInBytes() {
     return 0;
   }
 
@@ -128,7 +120,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     return new short[0];
   }
 
-  @Override public int getRowIdRlePageLengthInBytes() {
+  public int getRowIdRlePageLengthInBytes() {
     return 0;
   }
 
@@ -138,4 +130,5 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
   public byte[][] getDataPage() {
     return dataPage;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
index be6a1a7..f1b9af2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -19,11 +19,12 @@ package org.apache.carbondata.core.datastore.columnar;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.ByteUtil;
 
-public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
+public class BlockIndexerStorageForShort extends BlockIndexerStorage<byte[][]> {
 
   private boolean alreadySorted;
 
@@ -42,7 +43,10 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
       Arrays.sort(dataWithRowId);
     }
     short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
-    rleEncodeOnRowId(rowIds);
+    Map<String, short[]> rowIdAndRleRowIdPages =
+        rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+    rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
+    rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
     if (rleOnData) {
       rleEncodeOnData(dataWithRowId);
     }
@@ -80,66 +84,6 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
   }
 
   /**
-   * It compresses depends up on the sequence numbers.
-   * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
-   * first array the start and end of sequential numbers and second array
-   * keeps the indexes of where sequential numbers starts. If there is no
-   * sequential numbers then the same array it returns with empty second
-   * array.
-   *
-   * @param rowIds
-   */
-  private void rleEncodeOnRowId(short[] rowIds) {
-    List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    int k = 0;
-    int i = 1;
-    for (; i < rowIds.length; i++) {
-      if (rowIds[i] - rowIds[i - 1] == 1) {
-        k++;
-      } else {
-        if (k > 0) {
-          map.add(((short) list.size()));
-          list.add(rowIds[i - k - 1]);
-          list.add(rowIds[i - 1]);
-        } else {
-          list.add(rowIds[i - 1]);
-        }
-        k = 0;
-      }
-    }
-    if (k > 0) {
-      map.add(((short) list.size()));
-      list.add(rowIds[i - k - 1]);
-      list.add(rowIds[i - 1]);
-    } else {
-      list.add(rowIds[i - 1]);
-    }
-    int compressionPercentage = (((list.size() + map.size()) * 100) / rowIds.length);
-    if (compressionPercentage > 70) {
-      rowIdPage = rowIds;
-    } else {
-      rowIdPage = convertToArray(list);
-    }
-    if (rowIds.length == rowIdPage.length) {
-      rowIdRlePage = new short[0];
-    } else {
-      rowIdRlePage = convertToArray(map);
-    }
-    if (rowIdPage.length == 2 && rowIdRlePage.length == 1) {
-      alreadySorted = true;
-    }
-  }
-
-  private short[] convertToArray(List<Short> list) {
-    short[] shortArray = new short[list.size()];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i);
-    }
-    return shortArray;
-  }
-
-  /**
    * @return the alreadySorted
    */
   public boolean isAlreadySorted() {
@@ -153,7 +97,6 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
     return rowIdPage;
   }
 
-  @Override
   public int getRowIdPageLengthInBytes() {
     if (rowIdPage != null) {
       return rowIdPage.length * 2;
@@ -169,7 +112,6 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
     return rowIdRlePage;
   }
 
-  @Override
   public int getRowIdRlePageLengthInBytes() {
     if (rowIdRlePage != null) {
       return rowIdRlePage.length * 2;
@@ -234,6 +176,7 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
     return shortArray;
   }
 
+  @Override
   public short[] getDataRlePage() {
     return dataRlePage;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java
new file mode 100644
index 0000000..affef97
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithRowIdForNoDictionary.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.columnar;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+
+public class ColumnWithRowIdForNoDictionary<T>
+    implements Comparable<ColumnWithRowIdForNoDictionary<T>> {
+
+  Object column;
+
+  T index;
+
+  DataType dataType;
+
+  ColumnWithRowIdForNoDictionary(Object column, T index, DataType dataType) {
+    this.column = column;
+    this.index = index;
+    this.dataType = dataType;
+  }
+
+  @Override public int compareTo(ColumnWithRowIdForNoDictionary o) {
+    // use the data type based comparator for the no dictionary encoded columns
+    SerializableComparator comparator =
+        org.apache.carbondata.core.util.comparator.Comparator.getComparator(dataType);
+    return comparator.compare(column, o.column);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    ColumnWithRowIdForNoDictionary o = (ColumnWithRowIdForNoDictionary)obj;
+    return column.equals(o.column) && getIndex() == o.getIndex();
+  }
+
+  @Override public int hashCode() {
+    return getColumn().hashCode() + getIndex().hashCode();
+  }
+
+  /**
+   * @return the index
+   */
+  public T getIndex() {
+    return index;
+  }
+
+
+  /**
+   * @return the column
+   */
+  public Object getColumn() {
+    return column;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
deleted file mode 100644
index a30ea88..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-public interface IndexStorage<T> {
-
-  T getRowIdPage();
-
-  int getRowIdPageLengthInBytes();
-
-  T getRowIdRlePage();
-
-  int getRowIdRlePageLengthInBytes();
-
-  byte[][] getDataPage();
-
-  T getDataRlePage();
-
-  int getDataRlePageLengthInBytes();
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index 5b560ab..3067823 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -34,9 +34,11 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -62,6 +64,21 @@ public abstract class ColumnPageEncoder {
   protected abstract ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage);
 
   /**
+   * Get the target data type of the page if encoded
+   *
+   * @param inputPage
+   * @return
+   */
+  public DataType getTargetDataType(ColumnPage inputPage) {
+    ColumnPageEncoderMeta encoderMeta = getEncoderMeta(inputPage);
+    if (null != encoderMeta) {
+      return encoderMeta.getStoreDataType();
+    } else {
+      return null;
+    }
+  }
+
+  /**
    * Return a encoded column page by encoding the input page
    * The encoded binary data and metadata are wrapped in encoding column page
    */
@@ -118,17 +135,28 @@ public abstract class ColumnPageEncoder {
   }
 
   private void fillMinMaxIndex(ColumnPage inputPage, DataChunk2 dataChunk) {
-    dataChunk.setMin_max(buildMinMaxIndex(inputPage));
+    dataChunk.setMin_max(buildMinMaxIndex(inputPage, dataChunk.encoders));
   }
 
-  private BlockletMinMaxIndex buildMinMaxIndex(ColumnPage inputPage) {
+  private BlockletMinMaxIndex buildMinMaxIndex(ColumnPage inputPage, List<Encoding> encoders) {
     BlockletMinMaxIndex index = new BlockletMinMaxIndex();
-    byte[] bytes = CarbonUtil.getValueAsBytes(
-        inputPage.getDataType(), inputPage.getStatistics().getMax());
-    ByteBuffer max = ByteBuffer.wrap(
-        bytes);
-    ByteBuffer min = ByteBuffer.wrap(
-        CarbonUtil.getValueAsBytes(inputPage.getDataType(), inputPage.getStatistics().getMin()));
+    ByteBuffer max;
+    ByteBuffer min;
+    if (CarbonUtil.isEncodedWithMeta(encoders)
+        && inputPage.getColumnSpec().getColumnType() == ColumnType.PLAIN_VALUE) {
+      max = ByteBuffer.wrap(DataTypeUtil
+          .getMinMaxBytesBasedOnDataTypeForNoDictionaryColumn(inputPage.getStatistics().getMax(),
+              inputPage.getDataType()));
+      min = ByteBuffer.wrap(DataTypeUtil
+          .getMinMaxBytesBasedOnDataTypeForNoDictionaryColumn(inputPage.getStatistics().getMin(),
+              inputPage.getDataType()));
+    } else {
+      byte[] bytes =
+          CarbonUtil.getValueAsBytes(inputPage.getDataType(), inputPage.getStatistics().getMax());
+      max = ByteBuffer.wrap(bytes);
+      min = ByteBuffer.wrap(
+          CarbonUtil.getValueAsBytes(inputPage.getDataType(), inputPage.getStatistics().getMin()));
+    }
     index.addToMax_values(max);
     index.addToMin_values(min);
     index.addToMin_max_presence(inputPage.getStatistics().writeMinMax());
@@ -187,11 +215,11 @@ public abstract class ColumnPageEncoder {
       } else if ((inputPage.getDataType() == DataTypes.BYTE) || (inputPage.getDataType()
           == DataTypes.SHORT) || (inputPage.getDataType() == DataTypes.INT) || (
           inputPage.getDataType() == DataTypes.LONG)) {
-        return selectCodecByAlgorithmForIntegral(inputPage.getStatistics(), true)
+        return selectCodecByAlgorithmForIntegral(inputPage.getStatistics(), true, columnSpec)
             .createEncoder(null);
       } else if ((inputPage.getDataType() == DataTypes.FLOAT) || (inputPage.getDataType()
           == DataTypes.DOUBLE)) {
-        return selectCodecByAlgorithmForFloating(inputPage.getStatistics(), true)
+        return selectCodecByAlgorithmForFloating(inputPage.getStatistics(), true, columnSpec)
             .createEncoder(null);
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 29772d1..993b6b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding;
 
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
@@ -36,6 +37,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * Default factory will select encoding base on column page data type and statistics
@@ -57,8 +59,11 @@ public class DefaultEncodingFactory extends EncodingFactory {
   @Override
   public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPage inputPage) {
     // TODO: add log
-    if (columnSpec instanceof TableSpec.MeasureSpec) {
-      return createEncoderForMeasure(inputPage);
+    // choose the encoding type for measure type and no dictionary primitive type columns
+    if (columnSpec instanceof TableSpec.MeasureSpec || (
+        DataTypeUtil.isPrimitiveColumn(columnSpec.getSchemaDataType())
+            && columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
+      return createEncoderForMeasureOrNoDictionaryPrimitive(inputPage, columnSpec);
     } else {
       if (newWay) {
         return createEncoderForDimension((TableSpec.DimensionSpec) columnSpec, inputPage);
@@ -107,7 +112,8 @@ public class DefaultEncodingFactory extends EncodingFactory {
     }
   }
 
-  private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) {
+  private ColumnPageEncoder createEncoderForMeasureOrNoDictionaryPrimitive(ColumnPage columnPage,
+      TableSpec.ColumnSpec columnSpec) {
     SimpleStatsResult stats = columnPage.getStatistics();
     DataType dataType = stats.getDataType();
     if (dataType == DataTypes.BOOLEAN) {
@@ -116,11 +122,11 @@ public class DefaultEncodingFactory extends EncodingFactory {
         dataType == DataTypes.SHORT ||
         dataType == DataTypes.INT ||
         dataType == DataTypes.LONG) {
-      return selectCodecByAlgorithmForIntegral(stats, false).createEncoder(null);
+      return selectCodecByAlgorithmForIntegral(stats, false, columnSpec).createEncoder(null);
     } else if (DataTypes.isDecimal(dataType)) {
-      return createEncoderForDecimalDataTypeMeasure(columnPage);
+      return createEncoderForDecimalDataTypeMeasure(columnPage, columnSpec);
     } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
-      return selectCodecByAlgorithmForFloating(stats, false).createEncoder(null);
+      return selectCodecByAlgorithmForFloating(stats, false, columnSpec).createEncoder(null);
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
     } else {
@@ -128,13 +134,15 @@ public class DefaultEncodingFactory extends EncodingFactory {
     }
   }
 
-  private ColumnPageEncoder createEncoderForDecimalDataTypeMeasure(ColumnPage columnPage) {
+  private ColumnPageEncoder createEncoderForDecimalDataTypeMeasure(ColumnPage columnPage,
+      TableSpec.ColumnSpec columnSpec) {
     DecimalConverterFactory.DecimalConverterType decimalConverterType =
         ((DecimalColumnPage) columnPage).getDecimalConverter().getDecimalConverterType();
     switch (decimalConverterType) {
       case DECIMAL_INT:
       case DECIMAL_LONG:
-        return selectCodecByAlgorithmForDecimal(columnPage.getStatistics(), decimalConverterType)
+        return selectCodecByAlgorithmForDecimal(columnPage.getStatistics(), decimalConverterType,
+            columnSpec)
             .createEncoder(null);
       default:
         return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
@@ -245,7 +253,7 @@ public class DefaultEncodingFactory extends EncodingFactory {
    * size is smaller
    */
   static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats,
-      boolean isComplexPrimitive) {
+      boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec) {
     DataType srcDataType = stats.getDataType();
     DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
 
@@ -260,19 +268,40 @@ public class DefaultEncodingFactory extends EncodingFactory {
         return new DirectCompressCodec(stats.getDataType());
       }
     }
+    boolean isInvertedIndex = isInvertedIndex(isComplexPrimitive, columnSpec);
     if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
       // choose adaptive encoding
-      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats);
+      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats,
+          isInvertedIndex);
     } else {
       // choose delta adaptive encoding
-      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats);
+      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats,
+          isInvertedIndex);
     }
   }
 
+  /**
+   * Check whether the column is sort column and inverted index column
+   *
+   * @param isComplexPrimitive
+   * @param columnSpec
+   * @return
+   */
+  private static boolean isInvertedIndex(boolean isComplexPrimitive,
+      TableSpec.ColumnSpec columnSpec) {
+    boolean isSort;
+    boolean isInvertedIndex = false;
+    if (columnSpec instanceof TableSpec.DimensionSpec && !isComplexPrimitive) {
+      isSort = ((TableSpec.DimensionSpec) columnSpec).isInSortColumns();
+      isInvertedIndex = isSort && ((TableSpec.DimensionSpec) columnSpec).isDoInvertedIndex();
+    }
+    return isInvertedIndex;
+  }
+
   // choose between upscale adaptive encoder or upscale delta adaptive encoder,
   // based on whose target data type size is smaller
   static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats,
-      boolean isComplexPrimitive) {
+      boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec) {
     DataType srcDataType = stats.getDataType();
     double maxValue = (double) stats.getMax();
     double minValue = (double) stats.getMin();
@@ -290,7 +319,7 @@ public class DefaultEncodingFactory extends EncodingFactory {
     double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
     if (decimalCount == 0) {
       // short, int, long
-      return selectCodecByAlgorithmForIntegral(stats, false);
+      return selectCodecByAlgorithmForIntegral(stats, false, columnSpec);
     } else if (decimalCount < 0 && !isComplexPrimitive) {
       return new DirectCompressCodec(DataTypes.DOUBLE);
     } else {
@@ -304,11 +333,13 @@ public class DefaultEncodingFactory extends EncodingFactory {
         DataType deltaDataType = compareMinMaxAndSelectDataType(
             (long) (Math.pow(10, decimalCount) * (maxValue - minValue)));
         if (adaptiveDataType.getSizeInBytes() > deltaDataType.getSizeInBytes()) {
-          return new AdaptiveDeltaFloatingCodec(srcDataType, deltaDataType, stats);
+          return new AdaptiveDeltaFloatingCodec(srcDataType, deltaDataType, stats,
+              isInvertedIndex(isComplexPrimitive, columnSpec));
         } else if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes() || (
             (isComplexPrimitive) && (adaptiveDataType.getSizeInBytes() == DataTypes.DOUBLE
                 .getSizeInBytes()))) {
-          return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats);
+          return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats,
+              isInvertedIndex(isComplexPrimitive, columnSpec));
         } else {
           return new DirectCompressCodec(DataTypes.DOUBLE);
         }
@@ -321,7 +352,8 @@ public class DefaultEncodingFactory extends EncodingFactory {
    * size is smaller for decimal data type
    */
   static ColumnPageCodec selectCodecByAlgorithmForDecimal(SimpleStatsResult stats,
-      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
+      DecimalConverterFactory.DecimalConverterType decimalConverterType,
+      TableSpec.ColumnSpec columnSpec) {
     DataType srcDataType = stats.getDataType();
     DataType adaptiveDataType =
         fitMinMaxForDecimalType(stats.getDataType(), stats.getMax(), stats.getMin(),
@@ -343,10 +375,12 @@ public class DefaultEncodingFactory extends EncodingFactory {
     }
     if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
       // choose adaptive encoding
-      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats);
+      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats,
+          isInvertedIndex(columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE, columnSpec));
     } else {
       // choose delta adaptive encoding
-      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats);
+      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats,
+          isInvertedIndex(columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE, columnSpec));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index d119c8f..920a516 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -66,7 +66,7 @@ public abstract class EncodingFactory {
    */
   public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
       String compressor) throws IOException {
-    assert (encodings.size() == 1);
+    assert (encodings.size() >= 1);
     assert (encoderMetas.size() == 1);
     Encoding encoding = encodings.get(0);
     byte[] encoderMeta = encoderMetas.get(0).array();
@@ -81,25 +81,27 @@ public abstract class EncodingFactory {
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveIntegralCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
-          stats).createDecoder(metadata);
+          stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveDeltaIntegralCodec(metadata.getSchemaDataType(),
-          metadata.getStoreDataType(), stats).createDecoder(metadata);
+          metadata.getStoreDataType(), stats, encodings.contains(Encoding.INVERTED_INDEX))
+          .createDecoder(metadata);
     } else if (encoding == ADAPTIVE_FLOATING) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
-          stats).createDecoder(metadata);
+          stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_DELTA_FLOATING) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveDeltaFloatingCodec(metadata.getSchemaDataType(),
-          metadata.getStoreDataType(), stats).createDecoder(metadata);
+          metadata.getStoreDataType(), stats, encodings.contains(Encoding.INVERTED_INDEX))
+          .createDecoder(metadata);
     } else if (encoding == RLE_INTEGRAL) {
       RLEEncoderMeta metadata = new RLEEncoderMeta();
       metadata.readFields(in);
@@ -132,7 +134,7 @@ public abstract class EncodingFactory {
         dataType == DataTypes.LONG) {
       // create the codec based on algorithm and create decoder by recovering the metadata
       ColumnPageCodec codec =
-          DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats, false);
+          DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats, false, spec);
       if (codec instanceof AdaptiveIntegralCodec) {
         AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
         ColumnPageEncoderMeta meta =
@@ -154,7 +156,7 @@ public abstract class EncodingFactory {
     } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
       // create the codec based on algorithm and create decoder by recovering the metadata
       ColumnPageCodec codec =
-          DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats, false);
+          DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats, false, spec);
       if (codec instanceof AdaptiveFloatingCodec) {
         AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
         ColumnPageEncoderMeta meta =
@@ -180,7 +182,7 @@ public abstract class EncodingFactory {
     } else if (dataType == DataTypes.LEGACY_LONG) {
       // In case of older versions like in V1 format it has special datatype to handle
       AdaptiveIntegralCodec adaptiveCodec =
-          new AdaptiveIntegralCodec(DataTypes.LONG, DataTypes.LONG, stats);
+          new AdaptiveIntegralCodec(DataTypes.LONG, DataTypes.LONG, stats, false);
       ColumnPageEncoderMeta meta =
           new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
       return adaptiveCodec.createDecoder(meta);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
index ece5cb6..ef7a6a9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
@@ -17,9 +17,24 @@
 
 package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorage;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoDictionary;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.SortState;
 
 /**
  * Subclass of this codec depends on statistics of the column page (adaptive) to perform apply
@@ -38,17 +53,194 @@ public abstract class AdaptiveCodec implements ColumnPageCodec {
   // the data type specified in schema
   protected final DataType srcDataType;
 
+  protected boolean isInvertedIndex;
+
+  protected BlockIndexerStorage<Object[]> indexStorage;
+
+  protected ColumnPage encodedPage;
+
   protected AdaptiveCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
+      SimpleStatsResult stats, boolean isInvertedIndex) {
     this.stats = stats;
     this.srcDataType = srcDataType;
     this.targetDataType = targetDataType;
+    this.isInvertedIndex = isInvertedIndex;
   }
 
   public DataType getTargetDataType() {
     return targetDataType;
   }
 
+  /**
+   * Convert the data of the page based on the data type for each row
+   * While preparing the inverted index for the page,
+   * we need the data based on data type for no dict measure column if adaptive encoding is applied
+   * This is similar to page.getByteArrayPage()
+   *
+   * @param input
+   * @return
+   */
+  public Object[] getPageBasedOnDataType(ColumnPage input) {
+    Object[] data = new Object[input.getActualRowCount()];
+    if (srcDataType == DataTypes.BYTE || srcDataType == DataTypes.BOOLEAN) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getByte(i);
+      }
+    } else if (srcDataType == DataTypes.SHORT) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getShort(i);
+      }
+    } else if (srcDataType == DataTypes.SHORT_INT) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getShortInt(i);
+      }
+    } else if (srcDataType == DataTypes.INT) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getInt(i);
+      }
+    } else if (srcDataType == DataTypes.LONG) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getLong(i);
+      }
+    } else if (srcDataType == DataTypes.FLOAT) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getFloat(i);
+      }
+    } else if (srcDataType == DataTypes.DOUBLE) {
+      for (int i = 0; i < input.getActualRowCount(); i++) {
+        data[i] = input.getDouble(i);
+      }
+    }
+    return data;
+  }
+
+  /**
+   * Put the data to the page based on the data type for each row
+   *
+   * @param page
+   * @return
+   */
+  public void putDataToPage(ColumnPage page, Object[] dataPage) {
+    if (srcDataType == DataTypes.BYTE || srcDataType == DataTypes.BOOLEAN) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putByte(i, (byte) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.SHORT) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putShort(i, (short) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.SHORT_INT) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putShortInt(i, (int) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.INT) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putInt(i, (int) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.LONG) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putLong(i, (long) dataPage[i]);
+      }
+    } else if (srcDataType == DataTypes.DOUBLE) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putDouble(i, (double) dataPage[i]);
+      }
+    }
+  }
+
+  /**
+   * Write the inverted index to the page if required
+   *
+   * @param result
+   * @throws IOException
+   */
+  public byte[] writeInvertedIndexIfRequired(byte[] result) throws IOException {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(stream);
+    if (null != indexStorage) {
+      out.write(result);
+      if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+        out.writeInt(indexStorage.getRowIdPageLengthInBytes());
+        short[] rowIdPage = (short[]) indexStorage.getRowIdPage();
+        for (short rowId : rowIdPage) {
+          out.writeShort(rowId);
+        }
+        if (indexStorage.getRowIdRlePageLengthInBytes() > 0) {
+          short[] rowIdRlePage = (short[]) indexStorage.getRowIdRlePage();
+          for (short rowIdRle : rowIdRlePage) {
+            out.writeShort(rowIdRle);
+          }
+        }
+      }
+    }
+    byte[] bytes = stream.toByteArray();
+    stream.close();
+    return bytes;
+  }
+
+  /**
+   * Fill legacy fields if required
+   *
+   * @param dataChunk
+   * @param result
+   */
+  public void fillLegacyFieldsIfRequired(DataChunk2 dataChunk, byte[] result) {
+    if (null != indexStorage) {
+      SortState sort = (indexStorage.getRowIdPageLengthInBytes() > 0) ?
+          SortState.SORT_EXPLICIT :
+          SortState.SORT_NATIVE;
+      dataChunk.setSort_state(sort);
+      if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+        int rowIdPageLength =
+            CarbonCommonConstants.INT_SIZE_IN_BYTE + indexStorage.getRowIdPageLengthInBytes()
+                + indexStorage.getRowIdRlePageLengthInBytes();
+        dataChunk.setRowid_page_length(rowIdPageLength);
+      }
+    } else {
+      dataChunk.setRowid_page_length(0);
+    }
+    if (null != result) {
+      dataChunk.setData_page_length(result.length);
+    }
+  }
+
+  /**
+   * Get the new column page based on the sorted data
+   *
+   * @param input
+   * @return
+   * @throws MemoryException
+   */
+  public ColumnPage getSortedColumnPageIfRequired(ColumnPage input) throws MemoryException {
+    if (null != indexStorage) {
+      Object[] dataPage = indexStorage.getDataPage();
+      ColumnPageEncoderMeta columnPageEncoderMeta =
+          new ColumnPageEncoderMeta(input.getColumnSpec(), input.getDataType(),
+              input.getColumnPageEncoderMeta().getCompressorName());
+      ColumnPage columnPage = ColumnPage.newPage(columnPageEncoderMeta, input.getPageSize());
+      putDataToPage(columnPage, dataPage);
+      return columnPage;
+    } else {
+      return input;
+    }
+  }
+
+  public byte[] encodeAndCompressPage(ColumnPage input, ColumnPageValueConverter converter,
+      Compressor compressor) throws MemoryException, IOException {
+    encodedPage = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(), targetDataType,
+            input.getColumnPageEncoderMeta().getCompressorName()), input.getPageSize());
+    if (isInvertedIndex) {
+      indexStorage =
+          new BlockIndexerStorageForNoDictionary(getPageBasedOnDataType(input), input.getDataType(),
+              isInvertedIndex);
+    }
+    ColumnPage columnPage = getSortedColumnPageIfRequired(input);
+    columnPage.convertValue(converter);
+    byte[] result = encodedPage.compress(compressor);
+    return result;
+  }
+
   @Override
   public String toString() {
     return String.format("%s[src type: %s, target type: %s, stats(%s)]",
@@ -58,4 +250,5 @@ public abstract class AdaptiveCodec implements ColumnPageCodec {
   protected String debugInfo() {
     return this.toString();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index bb928c2..6d0a8d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -44,18 +45,18 @@ import org.apache.carbondata.format.Encoding;
  */
 public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
 
-  private ColumnPage encodedPage;
   private Double factor;
   private long max;
 
   public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    return new AdaptiveDeltaFloatingCodec(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    return new AdaptiveDeltaFloatingCodec(srcDataType, targetDataType, stats,
+        isInvertedIndex);
   }
 
   public AdaptiveDeltaFloatingCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    super(srcDataType, targetDataType, stats, isInvertedIndex);
     this.factor = Math.pow(10, stats.getDecimalCount());
     this.max = (long) (Math.pow(10, stats.getDecimalCount()) * (double) stats.getMax());
   }
@@ -68,20 +69,20 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
+      byte[] result = null;
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(
-            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
-                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
-            input.getPageSize());
-        input.convertValue(converter);
         Compressor compressor = CompressorFactory.getInstance().getCompressor(
             input.getColumnCompressorName());
-        byte[] result = encodedPage.compress(compressor);
+        result = encodeAndCompressPage(input, converter, compressor);
+        byte[] bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
+        if (bytes.length != 0) {
+          return bytes;
+        }
         return result;
       }
 
@@ -89,6 +90,9 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
       protected List<Encoding> getEncodingList() {
         List<Encoding> encodings = new ArrayList<Encoding>();
         encodings.add(Encoding.ADAPTIVE_DELTA_FLOATING);
+        if (null != indexStorage && indexStorage.getRowIdPageLengthInBytes() > 0) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
         return encodings;
       }
 
@@ -98,6 +102,11 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
             inputPage.getColumnCompressorName());
       }
 
+      @Override
+      protected void fillLegacyFields(DataChunk2 dataChunk) throws IOException {
+        fillLegacyFieldsIfRequired(dataChunk, result);
+      }
+
     };
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index ac9693d..9ada0bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -45,12 +46,11 @@ import org.apache.carbondata.format.Encoding;
  */
 public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
 
-  private ColumnPage encodedPage;
   private long max;
 
   public AdaptiveDeltaIntegralCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    super(srcDataType, targetDataType, stats, isInvertedIndex);
     if (srcDataType == DataTypes.BYTE) {
       this.max = (byte) stats.getMax();
     } else if (srcDataType == DataTypes.SHORT) {
@@ -78,21 +78,19 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
-
+      byte[] result = null;
+      final Compressor compressor = CompressorFactory.getInstance().getCompressor();
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(
-            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
-                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
-            input.getPageSize());
-        input.convertValue(converter);
-        Compressor compressor = CompressorFactory.getInstance().getCompressor(
-            input.getColumnCompressorName());
-        byte[] result = encodedPage.compress(compressor);
+        result = encodeAndCompressPage(input, converter, compressor);
+        byte[] bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
+        if (bytes.length != 0) {
+          return bytes;
+        }
         return result;
       }
 
@@ -106,9 +104,17 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       protected List<Encoding> getEncodingList() {
         List<Encoding> encodings = new ArrayList<>();
         encodings.add(Encoding.ADAPTIVE_DELTA_INTEGRAL);
+        if (null != indexStorage && indexStorage.getRowIdPageLengthInBytes() > 0) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
         return encodings;
       }
 
+      @Override
+      protected void fillLegacyFields(DataChunk2 dataChunk) throws IOException {
+        fillLegacyFieldsIfRequired(dataChunk, result);
+      }
+
     };
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 028fa71..af1e9ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -43,12 +44,11 @@ import org.apache.carbondata.format.Encoding;
  */
 public class AdaptiveFloatingCodec extends AdaptiveCodec {
 
-  private ColumnPage encodedPage;
   private Double factor;
 
   public AdaptiveFloatingCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    super(srcDataType, targetDataType, stats, isInvertedIndex);
     this.factor = Math.pow(10, stats.getDecimalCount());
   }
 
@@ -60,20 +60,20 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
+      byte[] result = null;
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(
-            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
-                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
-            input.getPageSize());
-        Compressor compressor = CompressorFactory.getInstance().getCompressor(
-            input.getColumnCompressorName());
-        input.convertValue(converter);
-        byte[] result = encodedPage.compress(compressor);
+        Compressor compressor =
+            CompressorFactory.getInstance().getCompressor(input.getColumnCompressorName());
+        result = encodeAndCompressPage(input, converter, compressor);
+        byte[] bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
+        if (bytes.length != 0) {
+          return bytes;
+        }
         return result;
       }
 
@@ -81,6 +81,9 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
       protected List<Encoding> getEncodingList() {
         List<Encoding> encodings = new ArrayList<Encoding>();
         encodings.add(Encoding.ADAPTIVE_FLOATING);
+        if (null != indexStorage && indexStorage.getRowIdPageLengthInBytes() > 0) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
         return encodings;
       }
 
@@ -90,6 +93,11 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
             inputPage.getColumnCompressorName());
       }
 
+      @Override
+      protected void fillLegacyFields(DataChunk2 dataChunk) throws IOException {
+        fillLegacyFieldsIfRequired(dataChunk, result);
+      }
+
     };
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index a9cf742..f1c0ea0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -42,11 +43,9 @@ import org.apache.carbondata.format.Encoding;
  */
 public class AdaptiveIntegralCodec extends AdaptiveCodec {
 
-  private ColumnPage encodedPage;
-
   public AdaptiveIntegralCodec(DataType srcDataType, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(srcDataType, targetDataType, stats);
+      SimpleStatsResult stats, boolean isInvertedIndex) {
+    super(srcDataType, targetDataType, stats, isInvertedIndex);
   }
 
   @Override
@@ -57,20 +56,20 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
+      byte[] result = null;
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(
-            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
-                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
-            input.getPageSize());
-        Compressor compressor = CompressorFactory.getInstance().getCompressor(
-            input.getColumnCompressorName());
-        input.convertValue(converter);
-        byte[] result = encodedPage.compress(compressor);
+        Compressor compressor =
+            CompressorFactory.getInstance().getCompressor(input.getColumnCompressorName());
+        result = encodeAndCompressPage(input, converter, compressor);
+        byte[] bytes = writeInvertedIndexIfRequired(result);
         encodedPage.freeMemory();
+        if (bytes.length != 0) {
+          return bytes;
+        }
         return result;
       }
 
@@ -78,6 +77,9 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       protected List<Encoding> getEncodingList() {
         List<Encoding> encodings = new ArrayList<Encoding>();
         encodings.add(Encoding.ADAPTIVE_INTEGRAL);
+        if (null != indexStorage && indexStorage.getRowIdPageLengthInBytes() > 0) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
         return encodings;
       }
 
@@ -87,6 +89,10 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
             inputPage.getColumnCompressorName());
       }
 
+      @Override
+      protected void fillLegacyFields(DataChunk2 dataChunk) throws IOException {
+        fillLegacyFieldsIfRequired(dataChunk, result);
+      }
     };
   }