You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/23 07:24:09 UTC

[7/7] carbondata git commit: [CARBONDATA-1371] Support creating decoder based on encoding metadata

[CARBONDATA-1371] Support creating decoder based on encoding metadata

Currently, when writing column page into carbondata file, it is not leveraging encoder metadata in the DataChunk2 (page metadata). In this PR, it uses the encoder metadata, So that encoder and decoder can be extend in the future.

Modifications:

When writing column page, support writing codec metadata into DataChunk2, for both dimension and measure column.
When reading column page, support creating decoder base on metadata in the DataChunk2.
schema.thrift file is modified, 5 encodings are added:
	DIRECT_COMPRESS = 6;  // Identifies that a columm is encoded using DirectCompressCodec
	ADAPTIVE_INTEGRAL = 7; // Identifies that a column is encoded using AdaptiveIntegralCodec
	ADAPTIVE_DELTA_INTEGRAL = 8; // Identifies that a column is encoded using AdaptiveDeltaIntegralCodec
	RLE_INTEGRAL = 9;     // Identifies that a column is encoded using RLECodec
	DIRECT_STRING = 10;   // Stores string value and string length separately in page data
backward compatibility is ensure by reusing ValueEncoderMeta, all new codec meta is extended from ValueEncoderMeta

V1 and V2 writer and corresponding test case are removed.

CI will fail due to thrift file modification. Please test it with mvn clean verify -Pspark-2.1 -Pbuild-with-format command.

This closes #1248


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e6a4f641
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e6a4f641
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e6a4f641

Branch: refs/heads/master
Commit: e6a4f6419823e9862503f4325b3cf70ab0e25550
Parents: d3a09e2
Author: Jacky Li <ja...@qq.com>
Authored: Fri Aug 18 14:48:09 2017 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Wed Aug 23 12:53:35 2017 +0530

----------------------------------------------------------------------
 .../datastore/chunk/AbstractRawColumnChunk.java |  18 +-
 .../datastore/chunk/MeasureColumnDataChunk.java |  72 --
 .../chunk/impl/DimensionRawColumnChunk.java     |   5 +-
 .../chunk/impl/MeasureRawColumnChunk.java       |  47 +-
 .../reader/DimensionColumnChunkReader.java      |   3 +-
 .../chunk/reader/MeasureColumnChunkReader.java  |   4 +-
 ...mpressedDimensionChunkFileBasedReaderV3.java | 113 +--
 .../measure/AbstractMeasureChunkReader.java     |   4 +-
 .../AbstractMeasureChunkReaderV2V3Format.java   |  12 +-
 ...CompressedMeasureChunkFileBasedReaderV1.java |  18 +-
 ...CompressedMeasureChunkFileBasedReaderV2.java |  19 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |  56 +-
 .../chunk/store/ColumnPageWrapper.java          |  92 +++
 .../core/datastore/compression/Compressor.java  |   2 +
 .../compression/CompressorFactory.java          |  14 +-
 .../datastore/compression/SnappyCompressor.java |   5 +
 .../core/datastore/page/ColumnPage.java         | 119 +++-
 .../page/ColumnPageValueConverter.java          |  38 +
 .../core/datastore/page/ComplexColumnPage.java  |   2 +-
 .../core/datastore/page/EncodedTablePage.java   |  84 +--
 .../core/datastore/page/LazyColumnPage.java     |  37 +-
 .../core/datastore/page/PrimitiveCodec.java     |  38 -
 .../datastore/page/SafeFixLengthColumnPage.java |  10 +-
 .../datastore/page/SafeVarLengthColumnPage.java |   5 +
 .../page/UnsafeFixLengthColumnPage.java         |   7 +-
 .../page/UnsafeVarLengthColumnPage.java         |  19 +
 .../datastore/page/VarLengthColumnPageBase.java |  23 +-
 .../page/encoding/AdaptiveCompressionCodec.java |  77 --
 .../page/encoding/AdaptiveIntegralCodec.java    | 220 ------
 .../page/encoding/ColumnPageCodec.java          |  33 +-
 .../page/encoding/ColumnPageDecoder.java        |  32 +
 .../page/encoding/ColumnPageEncoder.java        | 154 ++++
 .../page/encoding/ColumnPageEncoderMeta.java    | 274 +++++++
 .../encoding/ComplexDimensionIndexCodec.java    |  74 --
 .../page/encoding/DefaultEncodingStrategy.java  | 150 ++--
 .../page/encoding/DeltaIntegralCodec.java       | 261 -------
 .../page/encoding/DictDimensionIndexCodec.java  |  65 --
 .../page/encoding/DirectCompressCodec.java      | 144 ----
 .../encoding/DirectDictDimensionIndexCodec.java |  66 --
 .../page/encoding/EncodedColumnPage.java        |  67 +-
 .../page/encoding/EncodedDimensionPage.java     | 141 ----
 .../page/encoding/EncodedMeasurePage.java       |  87 ---
 .../page/encoding/EncodingStrategy.java         | 143 ++--
 .../page/encoding/EncodingStrategyFactory.java  |  33 +
 .../HighCardDictDimensionIndexCodec.java        |  66 --
 .../page/encoding/IndexStorageCodec.java        |  48 --
 .../core/datastore/page/encoding/RLECodec.java  | 419 -----------
 .../page/encoding/adaptive/AdaptiveCodec.java   |  63 ++
 .../adaptive/AdaptiveDeltaIntegralCodec.java    | 288 ++++++++
 .../AdaptiveDeltaIntegralEncoderMeta.java       |  47 ++
 .../encoding/adaptive/AdaptiveEncoderMeta.java  |  69 ++
 .../adaptive/AdaptiveIntegralCodec.java         | 250 +++++++
 .../adaptive/AdaptiveIntegralEncoderMeta.java   |  47 ++
 .../adaptive/DeltaIntegralConverter.java        | 213 ++++++
 .../encoding/compress/DirectCompressCodec.java  | 198 ++++++
 .../compress/DirectCompressorEncoderMeta.java   |  57 ++
 .../legacy/ComplexDimensionIndexCodec.java      |  68 ++
 .../legacy/DictDimensionIndexCodec.java         |  74 ++
 .../legacy/DirectDictDimensionIndexCodec.java   |  75 ++
 .../legacy/HighCardDictDimensionIndexCodec.java |  75 ++
 .../dimension/legacy/IndexStorageCodec.java     |  43 ++
 .../dimension/legacy/IndexStorageEncoder.java   |  93 +++
 .../datastore/page/encoding/rle/RLECodec.java   | 412 +++++++++++
 .../page/encoding/rle/RLEEncoderMeta.java       |  60 ++
 .../statistics/ColumnPageStatsCollector.java    |   2 +-
 .../page/statistics/KeyPageStatsCollector.java  | 118 ++++
 .../page/statistics/LVStringStatsCollector.java | 130 ++++
 .../statistics/PrimitivePageStatsCollector.java |  42 +-
 .../page/statistics/SimpleStatsResult.java      |   4 -
 .../page/statistics/TablePageStatistics.java    |  72 +-
 .../statistics/VarLengthPageStatsCollector.java | 121 ----
 .../core/metadata/BlockletInfoColumnar.java     | 336 ---------
 .../core/metadata/CodecMetaFactory.java         |  90 ---
 .../core/metadata/ColumnPageCodecMeta.java      | 317 ---------
 .../metadata/blocklet/datachunk/DataChunk.java  |   7 +-
 .../blocklet/datachunk/PresenceMeta.java        |  64 --
 .../core/metadata/encoder/Encoding.java         |  15 +-
 .../impl/AbstractScannedResultCollector.java    |  16 +-
 .../executer/ExcludeFilterExecuterImpl.java     |  16 +-
 .../executer/IncludeFilterExecuterImpl.java     |  12 +-
 .../executer/RowLevelFilterExecuterImpl.java    |  18 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  20 +-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  19 +-
 ...velRangeLessThanEqualFilterExecuterImpl.java |  12 +-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |  12 +-
 .../core/scan/result/AbstractScannedResult.java |  58 +-
 .../vector/MeasureDataVectorProcessor.java      |  99 ++-
 .../scan/scanner/AbstractBlockletScanner.java   |  10 +-
 .../core/scan/scanner/impl/FilterScanner.java   |  12 +-
 .../util/AbstractDataFileFooterConverter.java   |   8 +-
 .../core/util/CarbonMetadataUtil.java           | 332 +--------
 .../apache/carbondata/core/util/CarbonUtil.java |  49 +-
 .../apache/carbondata/core/util/NodeHolder.java |   4 +-
 .../core/writer/CarbonFooterWriter.java         |  64 --
 .../datastore/page/encoding/RLECodecSuite.java  |  17 +-
 .../impl/RawBasedResultCollectorTest.java       |  10 +-
 .../core/util/CarbonMetadataUtilTest.java       | 164 +----
 .../carbondata/core/util/CarbonTestUtil.java    |  40 ++
 .../core/writer/CarbonFooterWriterTest.java     | 212 ------
 .../scanner/impl/FilterScannerTest.java         |  12 +-
 format/src/main/thrift/schema.thrift            |   5 +
 .../dataload/TestDataLoadWithFileName.scala     |  56 --
 .../TestLoadDataWithHiveSyntaxV1Format.scala    | 708 -------------------
 .../TestLoadDataWithHiveSyntaxV2Format.scala    | 707 ------------------
 .../testsuite/datamap/DataMapWriterSuite.scala  |   2 +-
 .../filterexpr/FilterProcessorTestCase.scala    |  11 +-
 .../partition/TestAlterPartitionTable.scala     |   2 +-
 .../carbondata/CarbonDataSourceSuite.scala      |   3 +-
 .../store/CarbonDataWriterFactory.java          |   5 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   1 -
 .../carbondata/processing/store/TablePage.java  | 102 +--
 .../store/writer/AbstractFactDataWriter.java    |  96 +--
 .../store/writer/CarbonFactDataWriter.java      |   9 +-
 .../writer/v1/CarbonFactDataWriterImplV1.java   | 377 ----------
 .../writer/v2/CarbonFactDataWriterImplV2.java   | 294 --------
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  16 +-
 .../carbon/datastore/BlockIndexStoreTest.java   | 324 +++++----
 117 files changed, 4122 insertions(+), 6682 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
index eebb382..3345982 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
@@ -31,8 +31,6 @@ public abstract class AbstractRawColumnChunk {
 
   protected ByteBuffer rawData;
 
-  private int[] lengths;
-
   private int[] offsets;
 
   private int[] rowCount;
@@ -41,11 +39,11 @@ public abstract class AbstractRawColumnChunk {
 
   protected int blockletId;
 
-  protected int offSet;
+  private int offSet;
 
   protected int length;
 
-  protected DataChunk3 dataChunkV3;
+  private DataChunk3 dataChunkV3;
 
   public AbstractRawColumnChunk(int blockletId, ByteBuffer rawData, int offSet, int length) {
     this.blockletId = blockletId;
@@ -74,18 +72,6 @@ public abstract class AbstractRawColumnChunk {
     return rawData;
   }
 
-  public void setRawData(ByteBuffer rawData) {
-    this.rawData = rawData;
-  }
-
-  public int[] getLengths() {
-    return lengths;
-  }
-
-  public void setLengths(int[] lengths) {
-    this.lengths = lengths;
-  }
-
   public int[] getOffsets() {
     return offsets;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/MeasureColumnDataChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/MeasureColumnDataChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/MeasureColumnDataChunk.java
deleted file mode 100644
index 049527b..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/MeasureColumnDataChunk.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datastore.chunk;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
-
-/**
- * Holder for measure column chunk
- * it will have data and its attributes which will
- * be required for processing
- */
-public class MeasureColumnDataChunk {
-
-  /**
-   * measure column page
-   */
-  private ColumnPage measurePage;
-
-  /**
-   * below to hold null value holds this information
-   * about the null value index this will be helpful in case of
-   * to remove the null value while aggregation
-   */
-  private PresenceMeta nullValueIndexHolder;
-
-  /**
-   * @return the measurePage
-   */
-  public ColumnPage getColumnPage() {
-    return measurePage;
-  }
-
-  /**
-   * @param measurePage the column page to set
-   */
-  public void setColumnPage(ColumnPage measurePage) {
-    this.measurePage = measurePage;
-  }
-
-  /**
-   * @return the nullValueIndexHolder
-   */
-  public PresenceMeta getNullValueIndexHolder() {
-    return nullValueIndexHolder;
-  }
-
-  /**
-   * @param nullValueIndexHolder the nullValueIndexHolder to set
-   */
-  public void setNullValueIndexHolder(PresenceMeta nullValueIndexHolder) {
-    this.nullValueIndexHolder = nullValueIndexHolder;
-  }
-
-  public void freeMemory() {
-    this.measurePage.freeMemory();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index 048a703..1402e06 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
+import org.apache.carbondata.core.memory.MemoryException;
 
 /**
  * Contains raw dimension data,
@@ -57,7 +58,7 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
         if (dataChunks[i] == null) {
           dataChunks[i] = chunkReader.convertToDimensionChunk(this, i);
         }
-      } catch (Exception e) {
+      } catch (IOException | MemoryException e) {
         throw new RuntimeException(e);
       }
     }
@@ -77,7 +78,7 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
     if (dataChunks[index] == null) {
       try {
         dataChunks[index] = chunkReader.convertToDimensionChunk(this, index);
-      } catch (IOException e) {
+      } catch (IOException | MemoryException e) {
         throw new RuntimeException(e);
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 143dd4d..0e0e720 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -21,19 +21,19 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
 
 /**
  * Contains raw measure data
  * 1. The read uncompressed raw data of column chunk with all pages is stored in this instance.
- * 2. The raw data can be converted to processed chunk using convertToMeasureColDataChunk method
+ * 2. The raw data can be converted to processed chunk using convertToColumnPage method
  *  by specifying page number.
  */
 public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
 
-  private MeasureColumnDataChunk[] dataChunks;
+  private ColumnPage[] columnPages;
 
   private MeasureColumnChunkReader chunkReader;
 
@@ -46,53 +46,50 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
   }
 
   /**
-   * Convert all raw data with all pages to processed MeasureColumnDataChunk's
-   * @return
+   * Convert all raw data with all pages to processed ColumnPage
    */
-  public MeasureColumnDataChunk[] convertToMeasureColDataChunks() {
-    if (dataChunks == null) {
-      dataChunks = new MeasureColumnDataChunk[pagesCount];
+  public ColumnPage[] convertToColumnPage() {
+    if (columnPages == null) {
+      columnPages = new ColumnPage[pagesCount];
     }
     for (int i = 0; i < pagesCount; i++) {
       try {
-        if (dataChunks[i] == null) {
-          dataChunks[i] = chunkReader.convertToMeasureChunk(this, i);
+        if (columnPages[i] == null) {
+          columnPages[i] = chunkReader.convertToColumnPage(this, i);
         }
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
     }
 
-    return dataChunks;
+    return columnPages;
   }
 
   /**
-   * Convert raw data with specified page number processed to MeasureColumnDataChunk
-   * @param index
-   * @return
+   * Convert raw data with specified `columnIndex` processed to ColumnPage
    */
-  public MeasureColumnDataChunk convertToMeasureColDataChunk(int index) {
-    assert index < pagesCount;
-    if (dataChunks == null) {
-      dataChunks = new MeasureColumnDataChunk[pagesCount];
+  public ColumnPage convertToColumnPage(int columnIndex) {
+    assert columnIndex < pagesCount;
+    if (columnPages == null) {
+      columnPages = new ColumnPage[pagesCount];
     }
 
     try {
-      if (dataChunks[index] == null) {
-        dataChunks[index] = chunkReader.convertToMeasureChunk(this, index);
+      if (columnPages[columnIndex] == null) {
+        columnPages[columnIndex] = chunkReader.convertToColumnPage(this, columnIndex);
       }
     } catch (IOException | MemoryException e) {
       throw new RuntimeException(e);
     }
 
-    return dataChunks[index];
+    return columnPages[columnIndex];
   }
 
   @Override public void freeMemory() {
-    if (null != dataChunks) {
-      for (int i = 0; i < dataChunks.length; i++) {
-        if (dataChunks[i] != null) {
-          dataChunks[i].freeMemory();
+    if (null != columnPages) {
+      for (int i = 0; i < columnPages.length; i++) {
+        if (columnPages[i] != null) {
+          columnPages[i].freeMemory();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
index 7110bfa..7b5b9c8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.memory.MemoryException;
 
 /**
  * Interface for reading the data chunk
@@ -58,5 +59,5 @@ public interface DimensionColumnChunkReader {
    * @throws IOException
    */
   DimensionColumnDataChunk convertToDimensionChunk(DimensionRawColumnChunk dimensionRawColumnChunk,
-      int pageNumber) throws IOException;
+      int pageNumber) throws IOException, MemoryException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
index dba6823..02dc6a2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -19,8 +19,8 @@ package org.apache.carbondata.core.datastore.chunk.reader;
 import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
 
 /**
@@ -55,7 +55,7 @@ public interface MeasureColumnChunkReader {
    * @return
    * @throws IOException
    */
-  MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
+  ColumnPage convertToColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
       int pageNumber) throws IOException, MemoryException;
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 9a14a85..8ee020d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -18,15 +18,21 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.chunk.store.ColumnPageWrapper;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategyFactory;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
@@ -49,6 +55,8 @@ import org.apache.commons.lang.ArrayUtils;
  */
 public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkReaderV2V3Format {
 
+  private EncodingStrategy strategy = EncodingStrategyFactory.getStrategy();
+
   /**
    * end position of last dimension in carbon data file
    */
@@ -115,8 +123,6 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
     rawColumnChunk.setMaxValues(maxValueOfEachPage);
     rawColumnChunk.setMinValues(minValueOfEachPage);
     rawColumnChunk.setRowCount(eachPageLength);
-    rawColumnChunk.setLengths(ArrayUtils
-        .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
     rawColumnChunk.setOffsets(ArrayUtils
         .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
     return rawColumnChunk;
@@ -181,8 +187,6 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       dimensionDataChunks[index].setMaxValues(maxValueOfEachPage);
       dimensionDataChunks[index].setMinValues(minValueOfEachPage);
       dimensionDataChunks[index].setRowCount(eachPageLength);
-      dimensionDataChunks[index].setLengths(ArrayUtils
-          .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
       dimensionDataChunks[index].setOffsets(ArrayUtils
           .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
       runningLength += currentLength;
@@ -194,72 +198,101 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
   /**
    * Below method will be used to convert the compressed dimension chunk raw data to actual data
    *
-   * @param dimensionRawColumnChunk dimension raw chunk
+   * @param rawColumnPage dimension raw chunk
    * @param pageNumber              number
    * @return DimensionColumnDataChunk
    */
   @Override public DimensionColumnDataChunk convertToDimensionChunk(
-      DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException {
-    byte[] dataPage = null;
-    int[] invertedIndexes = null;
-    int[] invertedIndexesReverse = null;
-    int[] rlePage = null;
-    // data chunk of page
-    DataChunk2 dimensionColumnChunk = null;
+      DimensionRawColumnChunk rawColumnPage, int pageNumber) throws IOException, MemoryException {
     // data chunk of blocklet column
-    DataChunk3 dataChunk3 = dimensionRawColumnChunk.getDataChunkV3();
+    DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
     // get the data buffer
-    ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
-    dimensionColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+    ByteBuffer rawData = rawColumnPage.getRawData();
+    DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
     // calculating the start point of data
     // as buffer can contain multiple column data, start point will be datachunkoffset +
     // data chunk length + page offset
-    int copySourcePoint = dimensionRawColumnChunk.getOffSet() + dimensionChunksLength
-        .get(dimensionRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+    int offset = rawColumnPage.getOffSet() + dimensionChunksLength
+        .get(rawColumnPage.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
     // first read the data and uncompressed it
-    dataPage = COMPRESSOR
-        .unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length);
-    copySourcePoint += dimensionColumnChunk.data_page_length;
+    return decodeDimension(rawColumnPage, rawData, pageMetadata, offset);
+  }
+
+  private DimensionColumnDataChunk decodeDimensionByMeta(DataChunk2 pageMetadata,
+      ByteBuffer pageData, int offset)
+      throws IOException, MemoryException {
+    List<Encoding> encodings = pageMetadata.getEncoders();
+    List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
+    ColumnPageDecoder decoder = strategy.createDecoder(encodings, encoderMetas);
+    ColumnPage decodedPage = decoder.decode(
+        pageData.array(), offset, pageMetadata.data_page_length);
+    return new ColumnPageWrapper(decodedPage);
+  }
+
+  private boolean isEncodedWithMeta(DataChunk2 pageMetadata) {
+    List<Encoding> encodings = pageMetadata.getEncoders();
+    if (encodings != null && encodings.size() == 1) {
+      Encoding encoding = encodings.get(0);
+      switch (encoding) {
+        case DIRECT_COMPRESS:
+        case DIRECT_STRING:
+          return true;
+      }
+    }
+    return false;
+  }
+
+  private DimensionColumnDataChunk decodeDimension(DimensionRawColumnChunk rawColumnPage,
+      ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
+      throws IOException, MemoryException {
+    if (isEncodedWithMeta(pageMetadata)) {
+      return decodeDimensionByMeta(pageMetadata, pageData, offset);
+    } else {
+      // following code is for backward compatibility
+      return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
+    }
+  }
+
+  private DimensionColumnDataChunk decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
+      ByteBuffer pageData, DataChunk2 pageMetadata, int offset) {
+    byte[] dataPage;
+    int[] rlePage;
+    int[] invertedIndexes = null;
+    int[] invertedIndexesReverse = null;
+    dataPage = COMPRESSOR.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length);
+    offset += pageMetadata.data_page_length;
     // if row id block is present then read the row id chunk and uncompress it
-    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+    if (hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
       invertedIndexes = CarbonUtil
-          .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, rawData,
-              copySourcePoint);
-      copySourcePoint += dimensionColumnChunk.rowid_page_length;
+          .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
+      offset += pageMetadata.rowid_page_length;
       // get the reverse index
       invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
     }
     // if rle is applied then read the rle block chunk and then uncompress
     //then actual data based on rle block
-    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+    if (hasEncoding(pageMetadata.encoders, Encoding.RLE)) {
       rlePage =
-          CarbonUtil.getIntArray(rawData, copySourcePoint, dimensionColumnChunk.rle_page_length);
+          CarbonUtil.getIntArray(pageData, offset, pageMetadata.rle_page_length);
       // uncompress the data with rle indexes
       dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage,
-          eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
-      rlePage = null;
+          eachColumnValueSize[rawColumnPage.getBlockletId()]);
     }
-    // fill chunk attributes
+
     DimensionColumnDataChunk columnDataChunk = null;
 
-    if (dimensionColumnChunk.isRowMajor()) {
-      // to store fixed length column chunk values
-      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage,
-          eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()],
-          dimensionRawColumnChunk.getRowCount()[pageNumber]);
-    }
     // if no dictionary column then first create a no dictionary column chunk
     // and set to data chunk instance
-    else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
+    if (!hasEncoding(pageMetadata.encoders, Encoding.DICTIONARY)) {
       columnDataChunk =
           new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
-              dimensionRawColumnChunk.getRowCount()[pageNumber]);
+              pageMetadata.getNumberOfRowsInpage());
     } else {
       // to store fixed length column chunk values
       columnDataChunk =
           new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
-              dimensionRawColumnChunk.getRowCount()[pageNumber],
-              eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
+              pageMetadata.getNumberOfRowsInpage(),
+              eachColumnValueSize[rawColumnPage.getBlockletId()]);
     }
     return columnDataChunk;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
index 5605554..80c2be0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -17,15 +17,15 @@
 package org.apache.carbondata.core.datastore.chunk.reader.measure;
 
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
-import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategyFactory;
 
 /**
  * Measure block reader abstract class
  */
 public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkReader {
 
-  protected EncodingStrategy strategy = new DefaultEncodingStrategy();
+  protected EncodingStrategy strategy = EncodingStrategyFactory.getStrategy();
 
   /**
    * file path from which blocks will be read

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
index 049aba9..2239a2b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -22,9 +22,9 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
-import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
 
 /**
  * Abstract class for V2, V3 format measure column reader
@@ -99,13 +99,11 @@ public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasu
    * @param presentMetadataThrift
    * @return wrapper presence meta
    */
-  protected PresenceMeta getPresenceMeta(
+  protected BitSet getNullBitSet(
       org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
-    PresenceMeta presenceMeta = new PresenceMeta();
-    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
-    presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
-        .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
-    return presenceMeta;
+    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    return BitSet.valueOf(
+        compressor.unCompressByte(presentMetadataThrift.getPresent_bit_stream()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index f3c7067..257ae71 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -21,11 +21,10 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
@@ -92,21 +91,16 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
   }
 
   @Override
-  public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
+  public ColumnPage convertToColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
       int pageNumber) throws IOException, MemoryException {
     int blockIndex = measureRawColumnChunk.getBlockletId();
     DataChunk dataChunk = measureColumnChunks.get(blockIndex);
     ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
-    ColumnPageCodec codec = strategy.newCodec(meta);
-    ColumnPage page = codec.decode(measureRawColumnChunk.getRawData().array(),
+    ColumnPageDecoder codec = strategy.createDecoderLegacy(meta);
+    ColumnPage decodedPage = codec.decode(measureRawColumnChunk.getRawData().array(),
         measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
+    decodedPage.setNullBits(dataChunk.getNullValueIndexForColumn());
 
-    // create and set the data chunk
-    MeasureColumnDataChunk decodedChunk = new MeasureColumnDataChunk();
-    decodedChunk.setColumnPage(page);
-    // set the enun value indexes
-    decodedChunk
-        .setNullValueIndexHolder(dataChunk.getNullValueIndexForColumn());
-    return decodedChunk;
+    return decodedPage;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index a1c1267..20b910d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -21,11 +21,10 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
@@ -48,7 +47,8 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     super(blockletInfo, filePath);
   }
 
-  @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
+  @Override
+  public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
       throws IOException {
     int dataLength = 0;
     if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
@@ -108,9 +108,8 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     return dataChunks;
   }
 
-  public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
+  public ColumnPage convertToColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
       int pageNumber) throws IOException, MemoryException {
-    MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
     int copyPoint = measureRawColumnChunk.getOffSet();
     int blockIndex = measureRawColumnChunk.getBlockletId();
     ByteBuffer rawData = measureRawColumnChunk.getRawData();
@@ -121,12 +120,8 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     }
 
     ColumnPage page = decodeMeasure(measureRawColumnChunk, measureColumnChunk, copyPoint);
-
-    // set the data chunk
-    datChunk.setColumnPage(page);
-    // set the enun value indexes
-    datChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
-    return datChunk;
+    page.setNullBits(getNullBitSet(measureColumnChunk.presence));
+    return page;
   }
 
   protected ColumnPage decodeMeasure(MeasureRawColumnChunk measureRawColumnChunk,
@@ -136,7 +131,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     byte[] encodedMeta = encoder_meta.get(0).array();
 
     ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV3(encodedMeta);
-    ColumnPageCodec codec = strategy.newCodec(meta);
+    ColumnPageDecoder codec = strategy.createDecoderLegacy(meta);
     byte[] rawData = measureRawColumnChunk.getRawData().array();
     return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index a893ab0..6f126a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -21,17 +21,16 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
+import org.apache.carbondata.format.Encoding;
 
 import org.apache.commons.lang.ArrayUtils;
 
@@ -115,8 +114,6 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
     rawColumnChunk.setMaxValues(maxValueOfEachPage);
     rawColumnChunk.setMinValues(minValueOfEachPage);
     rawColumnChunk.setRowCount(eachPageLength);
-    rawColumnChunk.setLengths(ArrayUtils
-        .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
     rawColumnChunk.setOffsets(ArrayUtils
         .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
     return rawColumnChunk;
@@ -184,8 +181,6 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
       measureRawColumnChunk.setMaxValues(maxValueOfEachPage);
       measureRawColumnChunk.setMinValues(minValueOfEachPage);
       measureRawColumnChunk.setRowCount(eachPageLength);
-      measureRawColumnChunk.setLengths(ArrayUtils
-          .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
       measureRawColumnChunk.setOffsets(ArrayUtils
           .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
       measureDataChunk[index] = measureRawColumnChunk;
@@ -198,45 +193,38 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
   /**
    * Below method will be used to convert the compressed measure chunk raw data to actual data
    *
-   * @param measureRawColumnChunk measure raw chunk
+   * @param rawColumnPage measure raw chunk
    * @param pageNumber            number
    * @return DimensionColumnDataChunk
    */
   @Override
-  public MeasureColumnDataChunk convertToMeasureChunk(
-      MeasureRawColumnChunk measureRawColumnChunk, int pageNumber)
+  public ColumnPage convertToColumnPage(
+      MeasureRawColumnChunk rawColumnPage, int pageNumber)
       throws IOException, MemoryException {
-    MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
     // data chunk of blocklet column
-    DataChunk3 dataChunk3 = measureRawColumnChunk.getDataChunkV3();
+    DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
     // data chunk of page
-    DataChunk2 measureColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+    DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
     // calculating the start point of data
     // as buffer can contain multiple column data, start point will be datachunkoffset +
     // data chunk length + page offset
-    int copyPoint = measureRawColumnChunk.getOffSet() + measureColumnChunkLength
-        .get(measureRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
-    ColumnPage decodedPage = decodeMeasure(measureRawColumnChunk, measureColumnChunk, copyPoint);
-
-    // set the data chunk
-    datChunk.setColumnPage(decodedPage);
-    // set the null value indexes
-    datChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
-    return datChunk;
+    int offset = rawColumnPage.getOffSet() +
+        measureColumnChunkLength.get(rawColumnPage.getBlockletId()) +
+        dataChunk3.getPage_offset().get(pageNumber);
+    ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnPage.getRawData(), offset);
+    decodedPage.setNullBits(getNullBitSet(pageMetadata.presence));
+    return decodedPage;
   }
 
-  protected ColumnPage decodeMeasure(MeasureRawColumnChunk measureRawColumnChunk,
-      DataChunk2 measureColumnChunk, int copyPoint) throws MemoryException, IOException {
-    List<ByteBuffer> encoder_meta = measureColumnChunk.getEncoder_meta();
-    // for measure, it should have only one ValueEncoderMeta
-    assert (encoder_meta.size() > 0);
-    byte[] encodedMeta = encoder_meta.get(0).array();
-
-    ColumnPageCodecMeta meta = new ColumnPageCodecMeta();
-    meta.deserialize(encodedMeta);
-    ColumnPageCodec codec = strategy.newCodec(meta);
-    byte[] rawData = measureRawColumnChunk.getRawData().array();
-    return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
+  /**
+   * Decode measure column page with page header and raw data starting from offset
+   */
+  private ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset)
+      throws MemoryException, IOException {
+    List<Encoding> encodings = pageMetadata.getEncoders();
+    List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
+    ColumnPageDecoder codec = strategy.createDecoder(encodings, encoderMetas);
+    return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
new file mode 100644
index 0000000..5f09ffa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.chunk.store;
+
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+
+public class ColumnPageWrapper implements DimensionColumnDataChunk {
+
+  private ColumnPage columnPage;
+
+  public ColumnPageWrapper(ColumnPage columnPage) {
+    this.columnPage = columnPage;
+  }
+
+  @Override
+  public int fillChunkData(byte[] data, int offset, int columnIndex,
+      KeyStructureInfo restructuringInfo) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public int fillConvertedChunkData(int rowId, int columnIndex, int[] row,
+      KeyStructureInfo restructuringInfo) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo restructuringInfo) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo restructuringInfo) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public byte[] getChunkData(int columnIndex) {
+    return columnPage.getBytes(columnIndex);
+  }
+
+  @Override
+  public int getInvertedIndex(int rowId) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public boolean isNoDicitionaryColumn() {
+    return true;
+  }
+
+  @Override
+  public int getColumnValueSize() {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public boolean isExplicitSorted() {
+    return false;
+  }
+
+  @Override
+  public int compareTo(int index, byte[] compareValue) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public void freeMemory() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
index 2bc8678..cd31984 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 
 public interface Compressor {
 
+  String getName();
+
   byte[] compressByte(byte[] unCompInput);
 
   byte[] unCompressByte(byte[] compInput);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
index 8234891..18f6252 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
@@ -24,14 +24,14 @@ public class CompressorFactory {
 
   private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory();
 
-  private final Compressor compressor;
+  private final Compressor snappyCompressor;
 
   private CompressorFactory() {
     String compressorType = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR);
     switch (compressorType) {
       case "snappy":
-        compressor = new SnappyCompressor();
+        snappyCompressor = new SnappyCompressor();
         break;
       default:
         throw new RuntimeException(
@@ -44,7 +44,15 @@ public class CompressorFactory {
   }
 
   public Compressor getCompressor() {
-    return compressor;
+    return getCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR);
+  }
+
+  public Compressor getCompressor(String name) {
+    if (name.equalsIgnoreCase("snappy")) {
+      return snappyCompressor;
+    } else {
+      throw new UnsupportedOperationException(name + " compressor is not supported");
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
index f8a2f4f..4022680 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
@@ -49,6 +49,11 @@ public class SnappyCompressor implements Compressor {
     }
   }
 
+  @Override
+  public String getName() {
+    return "snappy";
+  }
+
   @Override public byte[] compressByte(byte[] unCompInput) {
     try {
       return Snappy.rawCompress(unCompInput, unCompInput.length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 8b44f07..2a78363 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -19,10 +19,12 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
@@ -45,10 +47,13 @@ public abstract class ColumnPage {
   protected int scale;
   protected int precision;
 
+  // The index of the rowId whose value is null, will be set to 1
+  private BitSet nullBitSet;
+
   // statistics collector for this column page
   private ColumnPageStatsCollector statsCollector;
 
-  protected DecimalConverterFactory.DecimalConverter decimalConverter;
+  DecimalConverterFactory.DecimalConverter decimalConverter;
 
   protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
@@ -59,6 +64,7 @@ public abstract class ColumnPage {
     this.pageSize = pageSize;
     this.scale = scale;
     this.precision = precision;
+    this.nullBitSet = new BitSet(pageSize);
     if (dataType == DECIMAL) {
       decimalConverter = DecimalConverterFactory.INSTANCE.getDecimalConverter(precision, scale);
     }
@@ -68,8 +74,39 @@ public abstract class ColumnPage {
     return dataType;
   }
 
-  public Object getStatistics() {
-    return statsCollector.getPageStats();
+  private static final SimpleStatsResult statsForComplexType = new SimpleStatsResult() {
+    @Override public Object getMin() {
+      return new byte[0];
+    }
+
+    @Override public Object getMax() {
+      return new byte[0];
+    }
+
+    @Override public int getDecimalPoint() {
+      return 0;
+    }
+
+    @Override public DataType getDataType() {
+      return BYTE_ARRAY;
+    }
+
+    @Override public int getScale() {
+      return 0;
+    }
+
+    @Override public int getPrecision() {
+      return 0;
+    }
+  };
+
+  public SimpleStatsResult getStatistics() {
+    if (statsCollector != null) {
+      return statsCollector.getPageStats();
+    } else {
+      // TODO: for sub column of complex type, there no stats yet, return a dummy result
+      return statsForComplexType;
+    }
   }
 
   public int getPageSize() {
@@ -114,31 +151,19 @@ public abstract class ColumnPage {
     }
   }
 
-  public static ColumnPage newVarLengthPage(DataType dataType, int pageSize) {
-    return newVarLengthPage(dataType, pageSize, -1, -1);
-  }
-
-  private static ColumnPage newVarLengthPage(DataType dataType, int pageSize, int scale,
-      int precision) {
-    if (unsafe) {
-      try {
-        return new UnsafeVarLengthColumnPage(dataType, pageSize, scale, precision);
-      } catch (MemoryException e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      return new SafeVarLengthColumnPage(dataType, pageSize, scale, precision);
-    }
+  public static ColumnPage newPage(DataType dataType, int pageSize) throws MemoryException {
+    return newPage(dataType, pageSize, -1, -1);
   }
 
-  public static ColumnPage newPage(DataType dataType, int pageSize) throws MemoryException {
-    return newPage(dataType, pageSize, 0, 0);
+  public static ColumnPage newDecimalPage(DataType dataType, int pageSize, int scale, int precision)
+    throws MemoryException {
+    return newPage(dataType, pageSize, scale, precision);
   }
 
   /**
    * Create a new page of dataType and number of row = pageSize
    */
-  public static ColumnPage newPage(DataType dataType, int pageSize, int scale, int precision)
+  private static ColumnPage newPage(DataType dataType, int pageSize, int scale, int precision)
       throws MemoryException {
     ColumnPage instance;
     if (unsafe) {
@@ -150,9 +175,10 @@ public abstract class ColumnPage {
         case LONG:
         case FLOAT:
         case DOUBLE:
-          instance = new UnsafeFixLengthColumnPage(dataType, pageSize, scale, precision);
+          instance = new UnsafeFixLengthColumnPage(dataType, pageSize, -1, -1);
           break;
         case DECIMAL:
+        case STRING:
         case BYTE_ARRAY:
           instance = new UnsafeVarLengthColumnPage(dataType, pageSize, scale, precision);
           break;
@@ -185,8 +211,9 @@ public abstract class ColumnPage {
         case DECIMAL:
           instance = newDecimalPage(new byte[pageSize][], scale, precision);
           break;
+        case STRING:
         case BYTE_ARRAY:
-          instance = new SafeVarLengthColumnPage(dataType, pageSize, scale, precision);
+          instance = new SafeVarLengthColumnPage(dataType, pageSize, -1, -1);
           break;
         default:
           throw new RuntimeException("Unsupported data dataType: " + dataType);
@@ -195,6 +222,12 @@ public abstract class ColumnPage {
     return instance;
   }
 
+  public static ColumnPage wrapByteArrayPage(byte[][] byteArray) {
+    ColumnPage columnPage = createPage(BYTE_ARRAY, byteArray.length, -1, -1);
+    columnPage.setByteArrayPage(byteArray);
+    return columnPage;
+  }
+
   private static ColumnPage newBytePage(byte[] byteData) {
     ColumnPage columnPage = createPage(BYTE, byteData.length,  -1, -1);
     columnPage.setBytePage(byteData);
@@ -248,9 +281,9 @@ public abstract class ColumnPage {
     return VarLengthColumnPageBase.newDecimalColumnPage(lvEncodedByteArray, scale, precision);
   }
 
-  private static ColumnPage newVarLengthPage(byte[] lvEncodedByteArray, int scale, int precision)
+  private static ColumnPage newLVBytesPage(byte[] lvEncodedByteArray)
       throws MemoryException {
-    return VarLengthColumnPageBase.newVarLengthColumnPage(lvEncodedByteArray, scale, precision);
+    return VarLengthColumnPageBase.newLVBytesColumnPage(lvEncodedByteArray);
   }
 
   /**
@@ -305,6 +338,7 @@ public abstract class ColumnPage {
     if (value == null) {
       putNull(rowId);
       statsCollector.updateNull(rowId);
+      nullBitSet.set(rowId);
       return;
     }
     switch (dataType) {
@@ -332,6 +366,7 @@ public abstract class ColumnPage {
         putDecimal(rowId, (BigDecimal) value);
         statsCollector.update((BigDecimal) value);
         break;
+      case STRING:
       case BYTE_ARRAY:
         putBytes(rowId, (byte[]) value);
         statsCollector.update((byte[]) value);
@@ -456,6 +491,11 @@ public abstract class ColumnPage {
   public abstract BigDecimal getDecimal(int rowId);
 
   /**
+   * Get byte array at rowId
+   */
+  public abstract byte[] getBytes(int rowId);
+
+  /**
    * Get byte value page
    */
   public abstract byte[] getBytePage();
@@ -508,7 +548,7 @@ public abstract class ColumnPage {
   /**
    * Encode the page data by codec (Visitor)
    */
-  public abstract void encode(PrimitiveCodec codec);
+  public abstract void convertValue(ColumnPageValueConverter codec);
 
   /**
    * Compress page data using specified compressor
@@ -539,10 +579,11 @@ public abstract class ColumnPage {
   }
 
   /**
-   * Decompress data and create a column page using the decompressed data
+   * Decompress data and create a column page using the decompressed data,
+   * except for decimal page
    */
   public static ColumnPage decompress(Compressor compressor, DataType dataType,
-      byte[] compressedData, int offset, int length, int scale, int precision)
+      byte[] compressedData, int offset, int length)
       throws MemoryException {
     switch (dataType) {
       case BYTE:
@@ -566,15 +607,29 @@ public abstract class ColumnPage {
       case DOUBLE:
         double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
         return newDoublePage(doubleData);
-      case DECIMAL:
-        byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
-        return newDecimalPage(lvEncodedBytes, scale, precision);
       case BYTE_ARRAY:
         byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
-        return newVarLengthPage(lvVarBytes, scale, precision);
+        return newLVBytesPage(lvVarBytes);
       default:
         throw new UnsupportedOperationException("unsupport uncompress column page: " + dataType);
     }
   }
 
+  /**
+   * Decompress decimal data and create a column page
+   */
+  public static ColumnPage decompressDecimalPage(Compressor compressor,
+      byte[] compressedData, int offset, int length, int scale, int precision)
+      throws MemoryException {
+    byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
+    return newDecimalPage(lvEncodedBytes, scale, precision);
+  }
+
+  public BitSet getNullBits() {
+    return nullBitSet;
+  }
+
+  public void setNullBits(BitSet nullBitSet) {
+    this.nullBitSet = nullBitSet;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
new file mode 100644
index 0000000..53ad956
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+// Transformation type that can be applied to ColumnPage
+public interface ColumnPageValueConverter {
+  void encode(int rowId, byte value);
+  void encode(int rowId, short value);
+  void encode(int rowId, int value);
+  void encode(int rowId, long value);
+  void encode(int rowId, float value);
+  void encode(int rowId, double value);
+
+  long decodeLong(byte value);
+  long decodeLong(short value);
+  long decodeLong(int value);
+  double decodeDouble(byte value);
+  double decodeDouble(short value);
+  double decodeDouble(int value);
+  double decodeDouble(long value);
+  double decodeDouble(float value);
+  double decodeDouble(double value);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index 5698e39..8cb18e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -43,7 +43,7 @@ public class ComplexColumnPage {
     this.depth = depth;
     complexColumnData = new ArrayList<>(depth);
     for (int i = 0; i < depth; i++) {
-      complexColumnData.add(new ArrayList<byte[]>(pageSize));
+      complexColumnData.add(new ArrayList<byte[]>());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
index 0aac1d9..dc0a388 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java
@@ -17,13 +17,10 @@
 
 package org.apache.carbondata.core.datastore.page;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import java.io.IOException;
+
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
 import org.apache.carbondata.core.datastore.page.key.TablePageKey;
-import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Table page that after encoding and compression.
@@ -31,10 +28,10 @@ import org.apache.carbondata.core.util.CarbonUtil;
 public class EncodedTablePage {
 
   // encoded data and metadata for each dimension column
-  private EncodedDimensionPage[] dimensions;
+  private EncodedColumnPage[] dimensionPages;
 
   // encoded data and metadata for each measure column
-  private EncodedMeasurePage[] measures;
+  private EncodedColumnPage[] measurePages;
 
   // key of this page
   private TablePageKey pageKey;
@@ -49,59 +46,40 @@ public class EncodedTablePage {
     EncodedTablePage page = new EncodedTablePage();
     page.pageSize = 0;
     page.encodedSize = 0;
-    page.measures = new EncodedMeasurePage[0];
-    page.dimensions = new EncodedDimensionPage[0];
+    page.dimensionPages = new EncodedColumnPage[0];
+    page.measurePages = new EncodedColumnPage[0];
     return page;
   }
 
   public static EncodedTablePage newInstance(int pageSize,
-      EncodedDimensionPage[] dimensions, EncodedMeasurePage[] measures,
-      TablePageKey tablePageKey) {
-    return new EncodedTablePage(pageSize, dimensions, measures, tablePageKey);
+      EncodedColumnPage[] dimensionPages, EncodedColumnPage[] measurePages,
+      TablePageKey tablePageKey) throws IOException {
+    return new EncodedTablePage(pageSize, dimensionPages, measurePages, tablePageKey);
   }
 
   private EncodedTablePage() {
   }
 
-  private EncodedTablePage(int pageSize, EncodedDimensionPage[] encodedDimensions,
-      EncodedMeasurePage[] encodedMeasures, TablePageKey tablePageKey) {
-    this.dimensions = encodedDimensions;
-    this.measures = encodedMeasures;
+  private EncodedTablePage(int pageSize,
+      EncodedColumnPage[] dimensionPages, EncodedColumnPage[] measurePages,
+      TablePageKey tablePageKey) throws IOException {
+    this.dimensionPages = dimensionPages;
+    this.measurePages = measurePages;
     this.pageSize = pageSize;
     this.pageKey = tablePageKey;
-    this.encodedSize = calculatePageSize(encodedDimensions, encodedMeasures);
+    this.encodedSize = calculatePageSize(dimensionPages, measurePages);
   }
 
   // return size in bytes of this encoded page
-  private int calculatePageSize(EncodedDimensionPage[] encodedDimensions,
-      EncodedMeasurePage[] encodedMeasures) {
+  private int calculatePageSize(EncodedColumnPage[] dimensionPages,
+      EncodedColumnPage[] measurePages) throws IOException {
     int size = 0;
-    int totalEncodedDimensionDataLength = 0;
-    int totalEncodedMeasuredDataLength = 0;
-    // add row id index length
-    for (EncodedDimensionPage dimension : dimensions) {
-      IndexStorage indexStorage = dimension.getIndexStorage();
-      if (!indexStorage.isAlreadySorted()) {
-        size += indexStorage.getRowIdPageLengthInBytes() +
-            indexStorage.getRowIdRlePageLengthInBytes() +
-            CarbonCommonConstants.INT_SIZE_IN_BYTE;
-      }
-      if (indexStorage.getDataRlePageLengthInBytes() > 0) {
-        size += indexStorage.getDataRlePageLengthInBytes();
-      }
-      totalEncodedDimensionDataLength += dimension.getEncodedData().length;
-    }
-    for (EncodedColumnPage measure : measures) {
-      size += measure.getEncodedData().length;
-    }
-
-    for (EncodedDimensionPage encodedDimension : encodedDimensions) {
-      size += CarbonUtil.getByteArray(encodedDimension.getDataChunk2()).length;
+    for (EncodedColumnPage dimensionPage : dimensionPages) {
+      size += dimensionPage.getTotalSerializedSize();
     }
-    for (EncodedMeasurePage encodedMeasure : encodedMeasures) {
-      size += CarbonUtil.getByteArray(encodedMeasure.getDataChunk2()).length;
+    for (EncodedColumnPage measurePage : measurePages) {
+      size += measurePage.getTotalSerializedSize();
     }
-    size += totalEncodedDimensionDataLength + totalEncodedMeasuredDataLength;
     return size;
   }
 
@@ -114,30 +92,30 @@ public class EncodedTablePage {
   }
 
   public int getNumDimensions() {
-    return dimensions.length;
+    return dimensionPages.length;
   }
 
   public int getNumMeasures() {
-    return measures.length;
+    return measurePages.length;
   }
 
   public TablePageKey getPageKey() {
     return pageKey;
   }
 
-  public EncodedMeasurePage getMeasure(int measureIndex) {
-    return measures[measureIndex];
+  public EncodedColumnPage getDimension(int dimensionIndex) {
+    return dimensionPages[dimensionIndex];
   }
 
-  public EncodedMeasurePage[] getMeasures() {
-    return measures;
+  public EncodedColumnPage getMeasure(int measureIndex) {
+    return measurePages[measureIndex];
   }
 
-  public EncodedDimensionPage getDimension(int dimensionIndex) {
-    return dimensions[dimensionIndex];
+  public EncodedColumnPage[] getDimensions() {
+    return dimensionPages;
   }
 
-  public EncodedDimensionPage[] getDimensions() {
-    return dimensions;
+  public EncodedColumnPage[] getMeasures() {
+    return measurePages;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 2864e80..80e508a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -29,35 +29,35 @@ public class LazyColumnPage extends ColumnPage {
   private ColumnPage columnPage;
 
   // encode that will apply to page data in getXXX
-  private PrimitiveCodec codec;
+  private ColumnPageValueConverter converter;
 
-  private LazyColumnPage(ColumnPage columnPage, PrimitiveCodec codec) {
+  private LazyColumnPage(ColumnPage columnPage, ColumnPageValueConverter converter) {
     super(columnPage.getDataType(), columnPage.getPageSize(), columnPage.scale,
         columnPage.precision);
     this.columnPage = columnPage;
-    this.codec = codec;
+    this.converter = converter;
   }
 
-  public static ColumnPage newPage(ColumnPage columnPage, PrimitiveCodec codec) {
+  public static ColumnPage newPage(ColumnPage columnPage, ColumnPageValueConverter codec) {
     return new LazyColumnPage(columnPage, codec);
   }
 
   @Override
   public String toString() {
-    return String.format("[encode: %s, data type: %s", codec, columnPage.getDataType());
+    return String.format("[converter: %s, data type: %s", converter, columnPage.getDataType());
   }
 
   @Override
   public long getLong(int rowId) {
     switch (columnPage.getDataType()) {
       case BYTE:
-        return codec.decodeLong(columnPage.getByte(rowId));
+        return converter.decodeLong(columnPage.getByte(rowId));
       case SHORT:
-        return codec.decodeLong(columnPage.getShort(rowId));
+        return converter.decodeLong(columnPage.getShort(rowId));
       case SHORT_INT:
-        return codec.decodeLong(columnPage.getShortInt(rowId));
+        return converter.decodeLong(columnPage.getShortInt(rowId));
       case INT:
-        return codec.decodeLong(columnPage.getInt(rowId));
+        return converter.decodeLong(columnPage.getInt(rowId));
       case LONG:
         return columnPage.getLong(rowId);
       default:
@@ -69,17 +69,17 @@ public class LazyColumnPage extends ColumnPage {
   public double getDouble(int rowId) {
     switch (columnPage.getDataType()) {
       case BYTE:
-        return codec.decodeDouble(columnPage.getByte(rowId));
+        return converter.decodeDouble(columnPage.getByte(rowId));
       case SHORT:
-        return codec.decodeDouble(columnPage.getShort(rowId));
+        return converter.decodeDouble(columnPage.getShort(rowId));
       case SHORT_INT:
-        return codec.decodeDouble(columnPage.getShortInt(rowId));
+        return converter.decodeDouble(columnPage.getShortInt(rowId));
       case INT:
-        return codec.decodeDouble(columnPage.getInt(rowId));
+        return converter.decodeDouble(columnPage.getInt(rowId));
       case LONG:
-        return codec.decodeDouble(columnPage.getLong(rowId));
+        return converter.decodeDouble(columnPage.getLong(rowId));
       case FLOAT:
-        return codec.decodeDouble(columnPage.getFloat(rowId));
+        return converter.decodeDouble(columnPage.getFloat(rowId));
       case DOUBLE:
         return columnPage.getDouble(rowId);
       default:
@@ -98,6 +98,11 @@ public class LazyColumnPage extends ColumnPage {
   }
 
   @Override
+  public byte[] getBytes(int rowId) {
+    return columnPage.getBytes(rowId);
+  }
+
+  @Override
   public byte[] getBytePage() {
     throw new UnsupportedOperationException("internal error");
   }
@@ -153,7 +158,7 @@ public class LazyColumnPage extends ColumnPage {
   }
 
   @Override
-  public void encode(PrimitiveCodec codec) {
+  public void convertValue(ColumnPageValueConverter codec) {
     throw new UnsupportedOperationException("internal error");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java
deleted file mode 100644
index 31eb7ac..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page;
-
-// Transformation type that can be applied to ColumnPage
-public interface PrimitiveCodec {
-  void encode(int rowId, byte value);
-  void encode(int rowId, short value);
-  void encode(int rowId, int value);
-  void encode(int rowId, long value);
-  void encode(int rowId, float value);
-  void encode(int rowId, double value);
-
-  long decodeLong(byte value);
-  long decodeLong(short value);
-  long decodeLong(int value);
-  double decodeDouble(byte value);
-  double decodeDouble(short value);
-  double decodeDouble(int value);
-  double decodeDouble(long value);
-  double decodeDouble(float value);
-  double decodeDouble(double value);
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 9bd85e6..ca5db95 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -170,6 +170,11 @@ public class SafeFixLengthColumnPage extends ColumnPage {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
+  @Override
+  public byte[] getBytes(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
   /**
    * Get byte value page
    */
@@ -312,7 +317,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    * @param codec type of transformation
    */
   @Override
-  public void encode(PrimitiveCodec codec) {
+  public void convertValue(ColumnPageValueConverter codec) {
     switch (dataType) {
       case BYTE:
         for (int i = 0; i < pageSize; i++) {
@@ -345,7 +350,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
         }
         break;
       default:
-        throw new UnsupportedOperationException("not support encode on " + dataType + " page");
+        throw new UnsupportedOperationException("not support value conversion on " +
+            dataType + " page");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index 63291f3..ac2bfdf 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -57,6 +57,11 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
   }
 
   @Override
+  public byte[] getBytes(int rowId) {
+    return byteArrayData[rowId];
+  }
+
+  @Override
   public void setByteArrayPage(byte[][] byteArray) {
     byteArrayData = byteArray;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 06d952d..2797104 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -183,6 +183,11 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
+  @Override
+  public byte[] getBytes(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
   @Override public byte[] getDecimalPage() {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
@@ -322,7 +327,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override
-  public void encode(PrimitiveCodec codec) {
+  public void convertValue(ColumnPageValueConverter codec) {
     int pageSize = getPageSize();
     switch (dataType) {
       case BYTE:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index c7d21ef..1c18fc7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -121,6 +121,16 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
         baseAddress, baseOffset + rowOffset[rowId], length);
   }
 
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    if (totalLength != 0) {
+      throw new IllegalStateException("page is not empty");
+    }
+    for (int i = 0; i < byteArray.length; i++) {
+      putBytes(i, byteArray[i]);
+    }
+  }
+
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
     putBytes(rowId, decimalConverter.convert(decimal));
   }
@@ -136,6 +146,15 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
   }
 
   @Override
+  public byte[] getBytes(int rowId) {
+    int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    byte[] bytes = new byte[length];
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+        bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+    return bytes;
+  }
+
+  @Override
   public byte[][] getByteArrayPage() {
     byte[][] bytes = new byte[pageSize][];
     for (int rowId = 0; rowId < pageSize; rowId++) {