You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/09/13 11:39:20 UTC

[3/3] carbondata git commit: [CARBONDATA-1400] Fix bug of array column out of bound when writing carbondata file

[CARBONDATA-1400] Fix bug of array column out of bound when writing carbondata file

If there is a big array in input csv file, when loading carbondata table, it may throw ArrayIndexOutOfBoundException because data exceed page size (32000 rows)

This PR fixed it by changing complex column encoding to DirectCompressionEncoding
This PR added a test case to test input data with big array

This closes #1273


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8c1ddbf2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8c1ddbf2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8c1ddbf2

Branch: refs/heads/master
Commit: 8c1ddbf2a6ba74a0a6d1333d95d0f6ad70297c01
Parents: b414393
Author: Jacky Li <ja...@qq.com>
Authored: Tue Sep 12 09:33:20 2017 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Wed Sep 13 17:08:40 2017 +0530

----------------------------------------------------------------------
 .../cache/dictionary/ColumnDictionaryInfo.java  |   5 -
 .../carbondata/core/datastore/ColumnType.java   |  51 ++++
 .../core/datastore/DimensionType.java           |  35 ---
 .../carbondata/core/datastore/TableSpec.java    | 116 ++++++---
 .../core/datastore/block/SegmentProperties.java |   4 +-
 .../datastore/chunk/AbstractRawColumnChunk.java |  10 +-
 .../chunk/impl/DimensionRawColumnChunk.java     |   4 +-
 .../chunk/impl/MeasureRawColumnChunk.java       |   4 +-
 ...mpressedDimensionChunkFileBasedReaderV1.java |  10 +-
 ...mpressedDimensionChunkFileBasedReaderV2.java |  18 +-
 ...mpressedDimensionChunkFileBasedReaderV3.java |  24 +-
 .../measure/AbstractMeasureChunkReader.java     |   6 +-
 ...CompressedMeasureChunkFileBasedReaderV1.java |  12 +-
 ...CompressedMeasureChunkFileBasedReaderV2.java |  18 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |  22 +-
 .../chunk/store/ColumnPageWrapper.java          |   6 +-
 .../core/datastore/page/ColumnPage.java         | 174 +++++++------
 .../core/datastore/page/LazyColumnPage.java     |   5 +-
 .../datastore/page/SafeFixLengthColumnPage.java |   7 +-
 .../datastore/page/SafeVarLengthColumnPage.java |  19 +-
 .../page/UnsafeFixLengthColumnPage.java         |   7 +-
 .../page/UnsafeVarLengthColumnPage.java         |  16 +-
 .../datastore/page/VarLengthColumnPageBase.java |  36 +--
 .../page/encoding/ColumnPageEncoder.java        |  14 +-
 .../page/encoding/ColumnPageEncoderMeta.java    |  76 ++++--
 .../page/encoding/DefaultEncodingFactory.java   | 250 +++++++++++++++++++
 .../page/encoding/DefaultEncodingStrategy.java  | 243 ------------------
 .../page/encoding/EncodingFactory.java          | 159 ++++++++++++
 .../page/encoding/EncodingStrategy.java         | 159 ------------
 .../page/encoding/EncodingStrategyFactory.java  |  33 ---
 .../page/encoding/adaptive/AdaptiveCodec.java   |   6 -
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |  15 +-
 .../AdaptiveDeltaIntegralEncoderMeta.java       |  47 ----
 .../encoding/adaptive/AdaptiveEncoderMeta.java  |  69 -----
 .../adaptive/AdaptiveFloatingCodec.java         |  15 +-
 .../adaptive/AdaptiveFloatingEncoderMeta.java   |  47 ----
 .../adaptive/AdaptiveIntegralCodec.java         |  15 +-
 .../adaptive/AdaptiveIntegralEncoderMeta.java   |  47 ----
 .../encoding/compress/DirectCompressCodec.java  |  24 +-
 .../compress/DirectCompressorEncoderMeta.java   |  57 -----
 .../datastore/page/encoding/rle/RLECodec.java   |  17 +-
 .../page/encoding/rle/RLEEncoderMeta.java       |   6 +-
 .../statistics/PrimitivePageStatsCollector.java |   8 +-
 .../core/scan/complextypes/ArrayQueryType.java  |   7 +-
 .../scan/complextypes/ComplexQueryType.java     |  15 +-
 .../scan/complextypes/PrimitiveQueryType.java   |   9 +-
 .../core/scan/complextypes/StructQueryType.java |   3 +-
 .../datastore/page/encoding/RLECodecSuite.java  |  10 +-
 .../core/util/CarbonMetadataUtilTest.java       |   5 +-
 examples/spark2/src/main/resources/data.csv     |   1 +
 .../examples/CarbonSessionExample.scala         |   3 +-
 .../TestComplexTypeWithBigArray.scala           | 160 ++++++++++++
 .../execution/CarbonLateDecodeStrategy.scala    |   4 +-
 .../processing/datatypes/ArrayDataType.java     |  11 +
 .../processing/datatypes/GenericDataType.java   |   4 +
 .../processing/datatypes/PrimitiveDataType.java |  16 +-
 .../processing/datatypes/StructDataType.java    |  15 ++
 .../carbondata/processing/store/TablePage.java  |  52 ++--
 .../util/CarbonDataProcessorUtil.java           |   6 +-
 59 files changed, 1136 insertions(+), 1101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
index 260ba90..bc748c6 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
@@ -312,11 +312,6 @@ public class ColumnDictionaryInfo extends AbstractColumnDictionaryInfo {
     }
   }
 
-  /**
-   * getDataType().
-   *
-   * @return
-   */
   public DataType getDataType() {
     return dataType;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
new file mode 100644
index 0000000..f98307b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+public enum ColumnType {
+  // global dictionary for low cardinality dimension column
+  GLOBAL_DICTIONARY,
+
+  // for timestamp and date column
+  DIRECT_DICTIONARY,
+
+  // for high cardinality dimension column
+  PLAIN_VALUE,
+
+  // complex column (array, struct, map)
+  COMPLEX,
+
+  // measure column, numerical data type
+  MEASURE;
+
+  public static ColumnType valueOf(int ordinal) {
+    if (ordinal == GLOBAL_DICTIONARY.ordinal()) {
+      return GLOBAL_DICTIONARY;
+    } else if (ordinal == DIRECT_DICTIONARY.ordinal()) {
+      return DIRECT_DICTIONARY;
+    } else if (ordinal == PLAIN_VALUE.ordinal()) {
+      return PLAIN_VALUE;
+    } else if (ordinal == COMPLEX.ordinal()) {
+      return COMPLEX;
+    } else if (ordinal == MEASURE.ordinal()) {
+      return MEASURE;
+    } else {
+      throw new RuntimeException("create ColumnType with invalid ordinal: " + ordinal);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/DimensionType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/DimensionType.java b/core/src/main/java/org/apache/carbondata/core/datastore/DimensionType.java
deleted file mode 100644
index f38b675..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/DimensionType.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore;
-
-public enum DimensionType {
-  // global dictionary for low cardinality dimension
-  GLOBAL_DICTIONARY,
-
-  // for timestamp and date column
-  DIRECT_DICTIONARY,
-
-  // no dictionary, for high cardinality dimension
-  PLAIN_VALUE,
-
-  // expanded column from a complex data type column
-  COMPLEX,
-
-  // column group, multiple columns encoded as one column
-  COLUMN_GROUP
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index 818f46e..5492f7b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -17,9 +17,13 @@
 
 package org.apache.carbondata.core.datastore;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 
@@ -56,16 +60,16 @@ public class TableSpec {
       CarbonDimension dimension = dimensions.get(i);
       if (dimension.isColumnar()) {
         if (dimension.isComplex()) {
-          DimensionSpec spec = new DimensionSpec(DimensionType.COMPLEX, dimension);
+          DimensionSpec spec = new DimensionSpec(ColumnType.COMPLEX, dimension);
           dimensionSpec[dimIndex++] = spec;
         } else if (dimension.isDirectDictionaryEncoding()) {
-          DimensionSpec spec = new DimensionSpec(DimensionType.DIRECT_DICTIONARY, dimension);
+          DimensionSpec spec = new DimensionSpec(ColumnType.DIRECT_DICTIONARY, dimension);
           dimensionSpec[dimIndex++] = spec;
         } else if (dimension.isGlobalDictionaryEncoding()) {
-          DimensionSpec spec = new DimensionSpec(DimensionType.GLOBAL_DICTIONARY, dimension);
+          DimensionSpec spec = new DimensionSpec(ColumnType.GLOBAL_DICTIONARY, dimension);
           dimensionSpec[dimIndex++] = spec;
         } else {
-          DimensionSpec spec = new DimensionSpec(DimensionType.PLAIN_VALUE, dimension);
+          DimensionSpec spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension);
           dimensionSpec[dimIndex++] = spec;
         }
       }
@@ -103,31 +107,77 @@ public class TableSpec {
     return measureSpec.length;
   }
 
-  public class ColumnSpec {
+  public static class ColumnSpec implements Writable {
     // field name of this column
     private String fieldName;
 
     // data type of this column
-    private DataType dataType;
+    private DataType schemaDataType;
 
-    ColumnSpec(String fieldName, DataType dataType) {
+    // dimension type of this dimension
+    private ColumnType columnType;
+
+    // scale and precision is for decimal column only
+    // TODO: make DataType a class instead of enum
+    private int scale;
+    private int precision;
+
+    public ColumnSpec() {
+    }
+
+    public ColumnSpec(String fieldName, DataType schemaDataType, ColumnType columnType) {
+      this(fieldName, schemaDataType, columnType, 0, 0);
+    }
+
+    public ColumnSpec(String fieldName, DataType schemaDataType, ColumnType columnType,
+        int scale, int precision) {
       this.fieldName = fieldName;
-      this.dataType = dataType;
+      this.schemaDataType = schemaDataType;
+      this.columnType = columnType;
+      this.scale = scale;
+      this.precision = precision;
     }
 
-    public DataType getDataType() {
-      return dataType;
+    public DataType getSchemaDataType() {
+      return schemaDataType;
     }
 
     public String getFieldName() {
       return fieldName;
     }
-  }
 
-  public class DimensionSpec extends ColumnSpec {
+    public ColumnType getColumnType() {
+      return columnType;
+    }
 
-    // dimension type of this dimension
-    private DimensionType type;
+    public int getScale() {
+      return scale;
+    }
+
+    public int getPrecision() {
+      return precision;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(fieldName);
+      out.writeByte(schemaDataType.ordinal());
+      out.writeByte(columnType.ordinal());
+      out.writeInt(scale);
+      out.writeInt(precision);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.fieldName = in.readUTF();
+      this.schemaDataType = DataType.valueOf(in.readByte());
+      this.columnType = ColumnType.valueOf(in.readByte());
+      this.scale = in.readInt();
+      this.precision = in.readInt();
+    }
+  }
+
+  public class DimensionSpec extends ColumnSpec implements Writable {
 
     // indicate whether this dimension is in sort column
     private boolean inSortColumns;
@@ -135,17 +185,12 @@ public class TableSpec {
     // indicate whether this dimension need to do inverted index
     private boolean doInvertedIndex;
 
-    DimensionSpec(DimensionType dimensionType, CarbonDimension dimension) {
-      super(dimension.getColName(), dimension.getDataType());
-      this.type = dimensionType;
+    DimensionSpec(ColumnType columnType, CarbonDimension dimension) {
+      super(dimension.getColName(), dimension.getDataType(), columnType, 0, 0);
       this.inSortColumns = dimension.isSortColumn();
       this.doInvertedIndex = dimension.isUseInvertedIndex();
     }
 
-    public DimensionType getDimensionType() {
-      return type;
-    }
-
     public boolean isInSortColumns() {
       return inSortColumns;
     }
@@ -153,25 +198,32 @@ public class TableSpec {
     public boolean isDoInvertedIndex() {
       return doInvertedIndex;
     }
-  }
 
-  public class MeasureSpec extends ColumnSpec {
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+    }
 
-    private int scale;
-    private int precision;
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+    }
+  }
+
+  public class MeasureSpec extends ColumnSpec implements Writable {
 
     MeasureSpec(String fieldName, DataType dataType, int scale, int precision) {
-      super(fieldName, dataType);
-      this.scale = scale;
-      this.precision = precision;
+      super(fieldName, dataType, ColumnType.MEASURE, scale, precision);
     }
 
-    public int getScale() {
-      return scale;
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
     }
 
-    public int getPrecision() {
-      return precision;
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index 23d2129..a742a5b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -650,8 +650,8 @@ public class SegmentProperties {
   public int[] getDimensionColumnsValueSize() {
     int[] dimensionValueSize =
         new int[eachDimColumnValueSize.length + eachComplexDimColumnValueSize.length];
-    System
-        .arraycopy(eachDimColumnValueSize, 0, dimensionValueSize, 0, eachDimColumnValueSize.length);
+    System.arraycopy(
+        eachDimColumnValueSize, 0, dimensionValueSize, 0, eachDimColumnValueSize.length);
     System.arraycopy(eachComplexDimColumnValueSize, 0, dimensionValueSize,
         eachDimColumnValueSize.length, eachComplexDimColumnValueSize.length);
     return dimensionValueSize;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
index 3345982..d1362c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
@@ -37,7 +37,7 @@ public abstract class AbstractRawColumnChunk {
 
   protected int pagesCount;
 
-  protected int blockletId;
+  protected int columnIndex;
 
   private int offSet;
 
@@ -45,8 +45,8 @@ public abstract class AbstractRawColumnChunk {
 
   private DataChunk3 dataChunkV3;
 
-  public AbstractRawColumnChunk(int blockletId, ByteBuffer rawData, int offSet, int length) {
-    this.blockletId = blockletId;
+  public AbstractRawColumnChunk(int columnIndex, ByteBuffer rawData, int offSet, int length) {
+    this.columnIndex = columnIndex;
     this.rawData = rawData;
     this.offSet = offSet;
     this.length = length;
@@ -98,8 +98,8 @@ public abstract class AbstractRawColumnChunk {
 
   public abstract void freeMemory();
 
-  public int getBlockletId() {
-    return blockletId;
+  public int getColumnIndex() {
+    return columnIndex;
   }
 
   public int getOffSet() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index 1402e06..cb112c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -39,9 +39,9 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
 
   private FileHolder fileHolder;
 
-  public DimensionRawColumnChunk(int blockletId, ByteBuffer rawData, int offSet, int length,
+  public DimensionRawColumnChunk(int columnIndex, ByteBuffer rawData, int offSet, int length,
       DimensionColumnChunkReader columnChunkReader) {
-    super(blockletId, rawData, offSet, length);
+    super(columnIndex, rawData, offSet, length);
     this.chunkReader = columnChunkReader;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 0e0e720..d41cf09 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -39,9 +39,9 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
 
   private FileHolder fileReader;
 
-  public MeasureRawColumnChunk(int blockId, ByteBuffer rawData, int offSet, int length,
+  public MeasureRawColumnChunk(int columnIndex, ByteBuffer rawData, int offSet, int length,
       MeasureColumnChunkReader chunkReader) {
-    super(blockId, rawData, offSet, length);
+    super(columnIndex, rawData, offSet, length);
     this.chunkReader = chunkReader;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
index 83e0c74..3e45082 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
@@ -79,18 +79,18 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead
    * Below method will be used to read the raw chunk based on block index
    *
    * @param fileReader file reader to read the blocks from file
-   * @param blockletIndex block to be read
+   * @param columnIndex column to be read
    * @return dimension column chunk
    */
   @Override public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
-      int blockletIndex) throws IOException {
-    DataChunk dataChunk = dimensionColumnChunk.get(blockletIndex);
+      int columnIndex) throws IOException {
+    DataChunk dataChunk = dimensionColumnChunk.get(columnIndex);
     ByteBuffer buffer = null;
     synchronized (fileReader) {
       buffer = fileReader
           .readByteBuffer(filePath, dataChunk.getDataPageOffset(), dataChunk.getDataPageLength());
     }
-    DimensionRawColumnChunk rawColumnChunk = new DimensionRawColumnChunk(blockletIndex, buffer, 0,
+    DimensionRawColumnChunk rawColumnChunk = new DimensionRawColumnChunk(columnIndex, buffer, 0,
         dataChunk.getDataPageLength(), this);
     rawColumnChunk.setFileHolder(fileReader);
     rawColumnChunk.setPagesCount(1);
@@ -100,7 +100,7 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead
 
   @Override public DimensionColumnDataChunk convertToDimensionChunk(
       DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException {
-    int blockIndex = dimensionRawColumnChunk.getBlockletId();
+    int blockIndex = dimensionRawColumnChunk.getColumnIndex();
     byte[] dataPage = null;
     int[] invertedIndexes = null;
     int[] invertedIndexesReverse = null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index bd8de36..0dea099 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -53,26 +53,26 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
    * Below method will be used to read the chunk based on block index
    *
    * @param fileReader    file reader to read the blocks from file
-   * @param blockletIndex block to be read
+   * @param columnIndex   column to be read
    * @return dimension column chunk
    */
-  public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int blockletIndex)
+  public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int columnIndex)
       throws IOException {
     int length = 0;
-    if (dimensionChunksOffset.size() - 1 == blockletIndex) {
+    if (dimensionChunksOffset.size() - 1 == columnIndex) {
       // Incase of last block read only for datachunk and read remaining while converting it.
-      length = dimensionChunksLength.get(blockletIndex);
+      length = dimensionChunksLength.get(columnIndex);
     } else {
-      long currentDimensionOffset = dimensionChunksOffset.get(blockletIndex);
-      length = (int) (dimensionChunksOffset.get(blockletIndex + 1) - currentDimensionOffset);
+      long currentDimensionOffset = dimensionChunksOffset.get(columnIndex);
+      length = (int) (dimensionChunksOffset.get(columnIndex + 1) - currentDimensionOffset);
     }
     ByteBuffer buffer = null;
     synchronized (fileReader) {
       buffer =
-          fileReader.readByteBuffer(filePath, dimensionChunksOffset.get(blockletIndex), length);
+          fileReader.readByteBuffer(filePath, dimensionChunksOffset.get(columnIndex), length);
     }
     DimensionRawColumnChunk rawColumnChunk =
-        new DimensionRawColumnChunk(blockletIndex, buffer, 0, length, this);
+        new DimensionRawColumnChunk(columnIndex, buffer, 0, length, this);
     rawColumnChunk.setFileHolder(fileReader);
     rawColumnChunk.setPagesCount(1);
     rawColumnChunk.setRowCount(new int[] { numberOfRows });
@@ -123,7 +123,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     int[] rlePage = null;
     DataChunk2 dimensionColumnChunk = null;
     int copySourcePoint = dimensionRawColumnChunk.getOffSet();
-    int blockIndex = dimensionRawColumnChunk.getBlockletId();
+    int blockIndex = dimensionRawColumnChunk.getColumnIndex();
     ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
     if (dimensionChunksOffset.size() - 1 == blockIndex) {
       dimensionColumnChunk =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 8ee020d..bb828a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -30,8 +30,8 @@ import org.apache.carbondata.core.datastore.chunk.store.ColumnPageWrapper;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
-import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
-import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategyFactory;
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -55,7 +55,7 @@ import org.apache.commons.lang.ArrayUtils;
  */
 public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkReaderV2V3Format {
 
-  private EncodingStrategy strategy = EncodingStrategyFactory.getStrategy();
+  private EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();
 
   /**
    * end position of last dimension in carbon data file
@@ -213,20 +213,18 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
     // as buffer can contain multiple column data, start point will be datachunkoffset +
     // data chunk length + page offset
     int offset = rawColumnPage.getOffSet() + dimensionChunksLength
-        .get(rawColumnPage.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+        .get(rawColumnPage.getColumnIndex()) + dataChunk3.getPage_offset().get(pageNumber);
     // first read the data and uncompressed it
     return decodeDimension(rawColumnPage, rawData, pageMetadata, offset);
   }
 
-  private DimensionColumnDataChunk decodeDimensionByMeta(DataChunk2 pageMetadata,
+  private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
       ByteBuffer pageData, int offset)
       throws IOException, MemoryException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
-    ColumnPageDecoder decoder = strategy.createDecoder(encodings, encoderMetas);
-    ColumnPage decodedPage = decoder.decode(
-        pageData.array(), offset, pageMetadata.data_page_length);
-    return new ColumnPageWrapper(decodedPage);
+    ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas);
+    return decoder.decode(pageData.array(), offset, pageMetadata.data_page_length);
   }
 
   private boolean isEncodedWithMeta(DataChunk2 pageMetadata) {
@@ -246,7 +244,9 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
       throws IOException, MemoryException {
     if (isEncodedWithMeta(pageMetadata)) {
-      return decodeDimensionByMeta(pageMetadata, pageData, offset);
+      ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset);
+      return new ColumnPageWrapper(decodedPage,
+          eachColumnValueSize[rawColumnPage.getColumnIndex()]);
     } else {
       // following code is for backward compatibility
       return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
@@ -276,7 +276,7 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
           CarbonUtil.getIntArray(pageData, offset, pageMetadata.rle_page_length);
       // uncompress the data with rle indexes
       dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage,
-          eachColumnValueSize[rawColumnPage.getBlockletId()]);
+          eachColumnValueSize[rawColumnPage.getColumnIndex()]);
     }
 
     DimensionColumnDataChunk columnDataChunk = null;
@@ -292,7 +292,7 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       columnDataChunk =
           new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
               pageMetadata.getNumberOfRowsInpage(),
-              eachColumnValueSize[rawColumnPage.getBlockletId()]);
+              eachColumnValueSize[rawColumnPage.getColumnIndex()]);
     }
     return columnDataChunk;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
index 80c2be0..d781cea 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -17,15 +17,15 @@
 package org.apache.carbondata.core.datastore.chunk.reader.measure;
 
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
-import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
-import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategyFactory;
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 
 /**
  * Measure block reader abstract class
  */
 public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkReader {
 
-  protected EncodingStrategy strategy = EncodingStrategyFactory.getStrategy();
+  protected EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();
 
   /**
    * file path from which blocks will be read

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index 257ae71..fcfd862 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -74,15 +74,15 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
    * Method to read the blocks data based on block index
    *
    * @param fileReader file reader to read the blocks
-   * @param blockIndex block to be read
+   * @param columnIndex column to be read
    * @return measure data chunk
    */
-  @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
+  @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int columnIndex)
       throws IOException {
-    DataChunk dataChunk = measureColumnChunks.get(blockIndex);
+    DataChunk dataChunk = measureColumnChunks.get(columnIndex);
     ByteBuffer buffer = fileReader
         .readByteBuffer(filePath, dataChunk.getDataPageOffset(), dataChunk.getDataPageLength());
-    MeasureRawColumnChunk rawColumnChunk = new MeasureRawColumnChunk(blockIndex, buffer, 0,
+    MeasureRawColumnChunk rawColumnChunk = new MeasureRawColumnChunk(columnIndex, buffer, 0,
         dataChunk.getDataPageLength(), this);
     rawColumnChunk.setFileReader(fileReader);
     rawColumnChunk.setPagesCount(1);
@@ -93,10 +93,10 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
   @Override
   public ColumnPage convertToColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
       int pageNumber) throws IOException, MemoryException {
-    int blockIndex = measureRawColumnChunk.getBlockletId();
+    int blockIndex = measureRawColumnChunk.getColumnIndex();
     DataChunk dataChunk = measureColumnChunks.get(blockIndex);
     ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
-    ColumnPageDecoder codec = strategy.createDecoderLegacy(meta);
+    ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta);
     ColumnPage decodedPage = codec.decode(measureRawColumnChunk.getRawData().array(),
         measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
     decodedPage.setNullBits(dataChunk.getNullValueIndexForColumn());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 20b910d..001c240 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -48,22 +48,22 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
   }
 
   @Override
-  public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
+  public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int columnIndex)
       throws IOException {
     int dataLength = 0;
-    if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
-      dataLength = measureColumnChunkLength.get(blockIndex);
+    if (measureColumnChunkOffsets.size() - 1 == columnIndex) {
+      dataLength = measureColumnChunkLength.get(columnIndex);
     } else {
-      long currentMeasureOffset = measureColumnChunkOffsets.get(blockIndex);
-      dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - currentMeasureOffset);
+      long currentMeasureOffset = measureColumnChunkOffsets.get(columnIndex);
+      dataLength = (int) (measureColumnChunkOffsets.get(columnIndex + 1) - currentMeasureOffset);
     }
     ByteBuffer buffer = null;
     synchronized (fileReader) {
       buffer = fileReader
-          .readByteBuffer(filePath, measureColumnChunkOffsets.get(blockIndex), dataLength);
+          .readByteBuffer(filePath, measureColumnChunkOffsets.get(columnIndex), dataLength);
     }
     MeasureRawColumnChunk rawColumnChunk =
-        new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this);
+        new MeasureRawColumnChunk(columnIndex, buffer, 0, dataLength, this);
     rawColumnChunk.setFileReader(fileReader);
     rawColumnChunk.setPagesCount(1);
     rawColumnChunk.setRowCount(new int[] { numberOfRows });
@@ -111,7 +111,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
   public ColumnPage convertToColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
       int pageNumber) throws IOException, MemoryException {
     int copyPoint = measureRawColumnChunk.getOffSet();
-    int blockIndex = measureRawColumnChunk.getBlockletId();
+    int blockIndex = measureRawColumnChunk.getColumnIndex();
     ByteBuffer rawData = measureRawColumnChunk.getRawData();
     DataChunk2 measureColumnChunk = CarbonUtil.readDataChunk(rawData, copyPoint,
         measureColumnChunkLength.get(blockIndex));
@@ -131,7 +131,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     byte[] encodedMeta = encoder_meta.get(0).array();
 
     ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV3(encodedMeta);
-    ColumnPageDecoder codec = strategy.createDecoderLegacy(meta);
+    ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta);
     byte[] rawData = measureRawColumnChunk.getRawData().array();
     return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 6f126a5..e207c82 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -67,36 +67,36 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
    * 5. Create the raw chunk object and fill the details
    *
    * @param fileReader          reader for reading the column from carbon data file
-   * @param blockletColumnIndex          blocklet index of the column in carbon data file
+   * @param columnIndex         column to be read
    * @return measure raw chunk
    */
   @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader,
-      int blockletColumnIndex) throws IOException {
+      int columnIndex) throws IOException {
     int dataLength = 0;
     // to calculate the length of the data to be read
     // column other than last column we can subtract the offset of current column with
     // next column and get the total length.
     // but for last column we need to use lastDimensionOffset which is the end position
     // of the last dimension, we can subtract current dimension offset from lastDimesionOffset
-    if (measureColumnChunkOffsets.size() - 1 == blockletColumnIndex) {
-      dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(blockletColumnIndex));
+    if (measureColumnChunkOffsets.size() - 1 == columnIndex) {
+      dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(columnIndex));
     } else {
       dataLength =
-          (int) (measureColumnChunkOffsets.get(blockletColumnIndex + 1) - measureColumnChunkOffsets
-              .get(blockletColumnIndex));
+          (int) (measureColumnChunkOffsets.get(columnIndex + 1) - measureColumnChunkOffsets
+              .get(columnIndex));
     }
     ByteBuffer buffer = null;
     // read the data from carbon data file
     synchronized (fileReader) {
       buffer = fileReader
-          .readByteBuffer(filePath, measureColumnChunkOffsets.get(blockletColumnIndex), dataLength);
+          .readByteBuffer(filePath, measureColumnChunkOffsets.get(columnIndex), dataLength);
     }
     // get the data chunk which will have all the details about the data pages
     DataChunk3 dataChunk =
-        CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(blockletColumnIndex));
+        CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(columnIndex));
     // creating a raw chunks instance and filling all the details
     MeasureRawColumnChunk rawColumnChunk =
-        new MeasureRawColumnChunk(blockletColumnIndex, buffer, 0, dataLength, this);
+        new MeasureRawColumnChunk(columnIndex, buffer, 0, dataLength, this);
     int numberOfPages = dataChunk.getPage_length().size();
     byte[][] maxValueOfEachPage = new byte[numberOfPages][];
     byte[][] minValueOfEachPage = new byte[numberOfPages][];
@@ -209,7 +209,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
     // as buffer can contain multiple column data, start point will be datachunkoffset +
     // data chunk length + page offset
     int offset = rawColumnPage.getOffSet() +
-        measureColumnChunkLength.get(rawColumnPage.getBlockletId()) +
+        measureColumnChunkLength.get(rawColumnPage.getColumnIndex()) +
         dataChunk3.getPage_offset().get(pageNumber);
     ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnPage.getRawData(), offset);
     decodedPage.setNullBits(getNullBitSet(pageMetadata.presence));
@@ -223,7 +223,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
       throws MemoryException, IOException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
-    ColumnPageDecoder codec = strategy.createDecoder(encodings, encoderMetas);
+    ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas);
     return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
index 5f09ffa..21e130b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -25,9 +25,11 @@ import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 public class ColumnPageWrapper implements DimensionColumnDataChunk {
 
   private ColumnPage columnPage;
+  private int columnValueSize;
 
-  public ColumnPageWrapper(ColumnPage columnPage) {
+  public ColumnPageWrapper(ColumnPage columnPage, int columnValueSize) {
     this.columnPage = columnPage;
+    this.columnValueSize = columnValueSize;
   }
 
   @Override
@@ -71,7 +73,7 @@ public class ColumnPageWrapper implements DimensionColumnDataChunk {
 
   @Override
   public int getColumnValueSize() {
-    throw new UnsupportedOperationException("internal error");
+    return columnValueSize;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 2e7bb3a..0be409e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -22,7 +22,11 @@ import java.math.BigDecimal;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -42,10 +46,14 @@ import static org.apache.carbondata.core.metadata.datatype.DataType.SHORT_INT;
 
 public abstract class ColumnPage {
 
+  // number of row in this page
   protected final int pageSize;
+
+  // data type of the page storage
   protected final DataType dataType;
-  protected int scale;
-  protected int precision;
+
+  // specification of this column
+  private final TableSpec.ColumnSpec columnSpec;
 
   // The index of the rowId whose value is null, will be set to 1
   private BitSet nullBitSet;
@@ -59,13 +67,18 @@ public abstract class ColumnPage {
       .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
           CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT));
 
-  protected ColumnPage(DataType dataType, int pageSize, int scale, int precision) {
+  /**
+   * Create a new column page with input data type and page size.
+   */
+  protected ColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
+    this.columnSpec = columnSpec;
     this.dataType = dataType;
     this.pageSize = pageSize;
-    this.scale = scale;
-    this.precision = precision;
     this.nullBitSet = new BitSet(pageSize);
     if (dataType == DECIMAL) {
+      assert (columnSpec.getColumnType() == ColumnType.MEASURE);
+      int precision = columnSpec.getPrecision();
+      int scale = columnSpec.getScale();
       decimalConverter = DecimalConverterFactory.INSTANCE.getDecimalConverter(precision, scale);
     }
   }
@@ -117,54 +130,52 @@ public abstract class ColumnPage {
     this.statsCollector = statsCollector;
   }
 
-  private static ColumnPage createVarLengthPage(DataType dataType, int pageSize, int scale,
-      int precision) {
+  private static ColumnPage createVarLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+      int pageSize) {
     if (unsafe) {
       try {
-        return new UnsafeVarLengthColumnPage(dataType, pageSize, scale, precision);
+        return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }
     } else {
-      return new SafeVarLengthColumnPage(dataType, pageSize, scale, precision);
+      return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize);
     }
   }
 
-  private static ColumnPage createFixLengthPage(DataType dataType, int pageSize, int scale,
-      int precision) {
+  private static ColumnPage createFixLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+      int pageSize) {
     if (unsafe) {
       try {
-        return new UnsafeFixLengthColumnPage(dataType, pageSize, scale, precision);
+        return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }
     } else {
-      return new SafeFixLengthColumnPage(dataType, pageSize, scale, pageSize);
+      return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize);
     }
   }
 
-  private static ColumnPage createPage(DataType dataType, int pageSize, int scale, int precision) {
+  private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+      int pageSize) {
     if (dataType.equals(BYTE_ARRAY) || dataType.equals(DECIMAL)) {
-      return createVarLengthPage(dataType, pageSize, scale, precision);
+      return createVarLengthPage(columnSpec, dataType, pageSize);
     } else {
-      return createFixLengthPage(dataType, pageSize, scale, precision);
+      return createFixLengthPage(columnSpec, dataType, pageSize);
     }
   }
 
-  public static ColumnPage newPage(DataType dataType, int pageSize) throws MemoryException {
-    return newPage(dataType, pageSize, -1, -1);
-  }
-
-  public static ColumnPage newDecimalPage(DataType dataType, int pageSize, int scale, int precision)
+  public static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+      int pageSize)
     throws MemoryException {
-    return newPage(dataType, pageSize, scale, precision);
+    return newPage(columnSpec, dataType, pageSize);
   }
 
   /**
    * Create a new page of dataType and number of row = pageSize
    */
-  private static ColumnPage newPage(DataType dataType, int pageSize, int scale, int precision)
-      throws MemoryException {
+  public static ColumnPage newPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+      int pageSize) throws MemoryException {
     ColumnPage instance;
     if (unsafe) {
       switch (dataType) {
@@ -175,12 +186,13 @@ public abstract class ColumnPage {
         case LONG:
         case FLOAT:
         case DOUBLE:
-          instance = new UnsafeFixLengthColumnPage(dataType, pageSize, -1, -1);
+          instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
           break;
         case DECIMAL:
         case STRING:
         case BYTE_ARRAY:
-          instance = new UnsafeVarLengthColumnPage(dataType, pageSize, scale, precision);
+          instance =
+              new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize);
           break;
         default:
           throw new RuntimeException("Unsupported data dataType: " + dataType);
@@ -188,32 +200,32 @@ public abstract class ColumnPage {
     } else {
       switch (dataType) {
         case BYTE:
-          instance = newBytePage(new byte[pageSize]);
+          instance = newBytePage(columnSpec, new byte[pageSize]);
           break;
         case SHORT:
-          instance = newShortPage(new short[pageSize]);
+          instance = newShortPage(columnSpec, new short[pageSize]);
           break;
         case SHORT_INT:
-          instance = newShortIntPage(new byte[pageSize * 3]);
+          instance = newShortIntPage(columnSpec, new byte[pageSize * 3]);
           break;
         case INT:
-          instance = newIntPage(new int[pageSize]);
+          instance = newIntPage(columnSpec, new int[pageSize]);
           break;
         case LONG:
-          instance = newLongPage(new long[pageSize]);
+          instance = newLongPage(columnSpec, new long[pageSize]);
           break;
         case FLOAT:
-          instance = newFloatPage(new float[pageSize]);
+          instance = newFloatPage(columnSpec, new float[pageSize]);
           break;
         case DOUBLE:
-          instance = newDoublePage(new double[pageSize]);
+          instance = newDoublePage(columnSpec, new double[pageSize]);
           break;
         case DECIMAL:
-          instance = newDecimalPage(new byte[pageSize][], scale, precision);
+          instance = newDecimalPage(columnSpec, new byte[pageSize][]);
           break;
         case STRING:
         case BYTE_ARRAY:
-          instance = new SafeVarLengthColumnPage(dataType, pageSize, -1, -1);
+          instance = new SafeVarLengthColumnPage(columnSpec, dataType, pageSize);
           break;
         default:
           throw new RuntimeException("Unsupported data dataType: " + dataType);
@@ -222,68 +234,68 @@ public abstract class ColumnPage {
     return instance;
   }
 
-  public static ColumnPage wrapByteArrayPage(byte[][] byteArray) {
-    ColumnPage columnPage = createPage(BYTE_ARRAY, byteArray.length, -1, -1);
+  public static ColumnPage wrapByteArrayPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray) {
+    ColumnPage columnPage = createPage(columnSpec, BYTE_ARRAY, byteArray.length);
     columnPage.setByteArrayPage(byteArray);
     return columnPage;
   }
 
-  private static ColumnPage newBytePage(byte[] byteData) {
-    ColumnPage columnPage = createPage(BYTE, byteData.length,  -1, -1);
+  private static ColumnPage newBytePage(TableSpec.ColumnSpec columnSpec, byte[] byteData) {
+    ColumnPage columnPage = createPage(columnSpec, BYTE, byteData.length);
     columnPage.setBytePage(byteData);
     return columnPage;
   }
 
-  private static ColumnPage newShortPage(short[] shortData) {
-    ColumnPage columnPage = createPage(SHORT, shortData.length,  -1, -1);
+  private static ColumnPage newShortPage(TableSpec.ColumnSpec columnSpec, short[] shortData) {
+    ColumnPage columnPage = createPage(columnSpec, SHORT, shortData.length);
     columnPage.setShortPage(shortData);
     return columnPage;
   }
 
-  private static ColumnPage newShortIntPage(byte[] shortIntData) {
-    ColumnPage columnPage = createPage(SHORT_INT, shortIntData.length / 3,  -1, -1);
+  private static ColumnPage newShortIntPage(TableSpec.ColumnSpec columnSpec, byte[] shortIntData) {
+    ColumnPage columnPage = createPage(columnSpec, SHORT_INT, shortIntData.length / 3);
     columnPage.setShortIntPage(shortIntData);
     return columnPage;
   }
 
-  private static ColumnPage newIntPage(int[] intData) {
-    ColumnPage columnPage = createPage(INT, intData.length,  -1, -1);
+  private static ColumnPage newIntPage(TableSpec.ColumnSpec columnSpec, int[] intData) {
+    ColumnPage columnPage = createPage(columnSpec, INT, intData.length);
     columnPage.setIntPage(intData);
     return columnPage;
   }
 
-  private static ColumnPage newLongPage(long[] longData) {
-    ColumnPage columnPage = createPage(LONG, longData.length,  -1, -1);
+  private static ColumnPage newLongPage(TableSpec.ColumnSpec columnSpec, long[] longData) {
+    ColumnPage columnPage = createPage(columnSpec, LONG, longData.length);
     columnPage.setLongPage(longData);
     return columnPage;
   }
 
-  private static ColumnPage newFloatPage(float[] floatData) {
-    ColumnPage columnPage = createPage(FLOAT, floatData.length,  -1, -1);
+  private static ColumnPage newFloatPage(TableSpec.ColumnSpec columnSpec, float[] floatData) {
+    ColumnPage columnPage = createPage(columnSpec, FLOAT, floatData.length);
     columnPage.setFloatPage(floatData);
     return columnPage;
   }
 
-  private static ColumnPage newDoublePage(double[] doubleData) {
-    ColumnPage columnPage = createPage(DOUBLE, doubleData.length, -1, -1);
+  private static ColumnPage newDoublePage(TableSpec.ColumnSpec columnSpec, double[] doubleData) {
+    ColumnPage columnPage = createPage(columnSpec, DOUBLE, doubleData.length);
     columnPage.setDoublePage(doubleData);
     return columnPage;
   }
 
-  private static ColumnPage newDecimalPage(byte[][] byteArray, int scale, int precision) {
-    ColumnPage columnPage = createPage(DECIMAL, byteArray.length, scale, precision);
+  private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray) {
+    ColumnPage columnPage = createPage(columnSpec, DECIMAL, byteArray.length);
     columnPage.setByteArrayPage(byteArray);
     return columnPage;
   }
 
-  private static ColumnPage newDecimalPage(byte[] lvEncodedByteArray, int scale, int precision)
-      throws MemoryException {
-    return VarLengthColumnPageBase.newDecimalColumnPage(lvEncodedByteArray, scale, precision);
+  private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec,
+      byte[] lvEncodedByteArray) throws MemoryException {
+    return VarLengthColumnPageBase.newDecimalColumnPage(columnSpec, lvEncodedByteArray);
   }
 
-  private static ColumnPage newLVBytesPage(byte[] lvEncodedByteArray)
-      throws MemoryException {
-    return VarLengthColumnPageBase.newLVBytesColumnPage(lvEncodedByteArray);
+  private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec,
+      byte[] lvEncodedByteArray) throws MemoryException {
+    return VarLengthColumnPageBase.newLVBytesColumnPage(columnSpec, lvEncodedByteArray);
   }
 
   /**
@@ -538,7 +550,7 @@ public abstract class ColumnPage {
   /**
    * For variable length page, get the flattened data
    */
-  public abstract byte[] getFlattenedBytePage();
+  public abstract byte[] getLVFlattenedBytePage() throws IOException;
 
   /**
    * For decimals
@@ -572,7 +584,7 @@ public abstract class ColumnPage {
       case DECIMAL:
         return compressor.compressByte(getDecimalPage());
       case BYTE_ARRAY:
-        return compressor.compressByte(getFlattenedBytePage());
+        return compressor.compressByte(getLVFlattenedBytePage());
       default:
         throw new UnsupportedOperationException("unsupport compress column page: " + dataType);
     }
@@ -582,47 +594,51 @@ public abstract class ColumnPage {
    * Decompress data and create a column page using the decompressed data,
    * except for decimal page
    */
-  public static ColumnPage decompress(Compressor compressor, DataType dataType,
-      byte[] compressedData, int offset, int length)
+  public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] compressedData,
+      int offset, int length)
       throws MemoryException {
-    switch (dataType) {
+    Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+    TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+    switch (meta.getStoreDataType()) {
       case BYTE:
         byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
-        return newBytePage(byteData);
+        return newBytePage(columnSpec, byteData);
       case SHORT:
         short[] shortData = compressor.unCompressShort(compressedData, offset, length);
-        return newShortPage(shortData);
+        return newShortPage(columnSpec, shortData);
       case SHORT_INT:
         byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
-        return newShortIntPage(shortIntData);
+        return newShortIntPage(columnSpec, shortIntData);
       case INT:
         int[] intData = compressor.unCompressInt(compressedData, offset, length);
-        return newIntPage(intData);
+        return newIntPage(columnSpec, intData);
       case LONG:
         long[] longData = compressor.unCompressLong(compressedData, offset, length);
-        return newLongPage(longData);
+        return newLongPage(columnSpec, longData);
       case FLOAT:
         float[] floatData = compressor.unCompressFloat(compressedData, offset, length);
-        return newFloatPage(floatData);
+        return newFloatPage(columnSpec, floatData);
       case DOUBLE:
         double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
-        return newDoublePage(doubleData);
+        return newDoublePage(columnSpec, doubleData);
       case BYTE_ARRAY:
         byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
-        return newLVBytesPage(lvVarBytes);
+        return newLVBytesPage(columnSpec, lvVarBytes);
       default:
-        throw new UnsupportedOperationException("unsupport uncompress column page: " + dataType);
+        throw new UnsupportedOperationException("unsupport uncompress column page: " +
+            meta.getStoreDataType());
     }
   }
 
   /**
    * Decompress decimal data and create a column page
    */
-  public static ColumnPage decompressDecimalPage(Compressor compressor,
-      byte[] compressedData, int offset, int length, int scale, int precision)
-      throws MemoryException {
+  public static ColumnPage decompressDecimalPage(ColumnPageEncoderMeta meta, byte[] compressedData,
+      int offset, int length) throws MemoryException {
+    Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
+    TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
     byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
-    return newDecimalPage(lvEncodedBytes, scale, precision);
+    return newDecimalPage(columnSpec, lvEncodedBytes);
   }
 
   public BitSet getNullBits() {
@@ -632,4 +648,8 @@ public abstract class ColumnPage {
   public void setNullBits(BitSet nullBitSet) {
     this.nullBitSet = nullBitSet;
   }
+
+  public TableSpec.ColumnSpec getColumnSpec() {
+    return columnSpec;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 80e508a..1e90387 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -32,8 +32,7 @@ public class LazyColumnPage extends ColumnPage {
   private ColumnPageValueConverter converter;
 
   private LazyColumnPage(ColumnPage columnPage, ColumnPageValueConverter converter) {
-    super(columnPage.getDataType(), columnPage.getPageSize(), columnPage.scale,
-        columnPage.precision);
+    super(columnPage.getColumnSpec(), columnPage.getDataType(), columnPage.getPageSize());
     this.columnPage = columnPage;
     this.converter = converter;
   }
@@ -153,7 +152,7 @@ public class LazyColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] getFlattenedBytePage() {
+  public byte[] getLVFlattenedBytePage() {
     throw new UnsupportedOperationException("internal error");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index ca5db95..5e0e822 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil;
 
@@ -36,8 +37,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   private double[] doubleData;
   private byte[] shortIntData;
 
-  SafeFixLengthColumnPage(DataType dataType, int pageSize, int scale, int precision) {
-    super(dataType, pageSize, scale, precision);
+  SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
+    super(columnSpec, dataType, pageSize);
   }
 
   /**
@@ -240,7 +241,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] getFlattenedBytePage() {
+  public byte[] getLVFlattenedBytePage() {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index ac2bfdf..dde6132 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -17,8 +17,12 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
@@ -26,8 +30,8 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
   // for string and decimal data
   private byte[][] byteArrayData;
 
-  SafeVarLengthColumnPage(DataType dataType, int pageSize, int scale, int precision) {
-    super(dataType, pageSize, scale, precision);
+  SafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
+    super(columnSpec, dataType, pageSize);
     byteArrayData = new byte[pageSize][];
   }
 
@@ -67,6 +71,17 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
   }
 
   @Override
+  public byte[] getLVFlattenedBytePage() throws IOException {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(stream);
+    for (byte[] byteArrayDatum : byteArrayData) {
+      out.writeInt(byteArrayDatum.length);
+      out.write(byteArrayDatum);
+    }
+    return stream.toByteArray();
+  }
+
+  @Override
   public byte[][] getByteArrayPage() {
     return byteArrayData;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 2797104..7b55889 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.page;
 import java.io.IOException;
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
@@ -52,9 +53,9 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   private static final int floatBits = DataType.FLOAT.getSizeBits();
   private static final int doubleBits = DataType.DOUBLE.getSizeBits();
 
-  UnsafeFixLengthColumnPage(DataType dataType, int pageSize, int scale, int precision)
+  UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
       throws MemoryException {
-    super(dataType, pageSize, scale, precision);
+    super(columnSpec, dataType, pageSize);
     switch (dataType) {
       case BYTE:
       case SHORT:
@@ -266,7 +267,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] getFlattenedBytePage() {
+  public byte[] getLVFlattenedBytePage() {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index 1c18fc7..85b9b9f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -51,12 +52,10 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   /**
    * create a page
-   * @param dataType data type
-   * @param pageSize number of row
    */
-  UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int scale, int precision)
+  UnsafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
       throws MemoryException {
-    super(dataType, pageSize, scale, precision);
+    super(columnSpec, dataType, pageSize);
     capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
     memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
     baseAddress = memoryBlock.getBaseObject();
@@ -65,13 +64,10 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   /**
    * create a page with initial capacity
-   * @param dataType data type
-   * @param pageSize number of row
-   * @param capacity initial capacity of the page, in bytes
    */
-  UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity,
-      int scale, int precision) throws MemoryException {
-    super(dataType, pageSize, scale, precision);
+  UnsafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
+      int capacity) throws MemoryException {
+    super(columnSpec, dataType, pageSize);
     this.capacity = capacity;
     memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long)(capacity));
     baseAddress = memoryBlock.getBaseObject();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 83b1ca7..9338bbc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -17,9 +17,11 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
@@ -35,8 +37,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   // the length of bytes added in the page
   int totalLength;
 
-  VarLengthColumnPageBase(DataType dataType, int pageSize, int scale, int precision) {
-    super(dataType, pageSize, scale, precision);
+  VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
+    super(columnSpec, dataType, pageSize);
     rowOffset = new int[pageSize + 1];
     totalLength = 0;
   }
@@ -79,29 +81,30 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   /**
    * Create a new column page for decimal page
    */
-  static ColumnPage newDecimalColumnPage(byte[] lvEncodedBytes, int scale, int precision)
+  static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes)
       throws MemoryException {
     DecimalConverterFactory.DecimalConverter decimalConverter =
-        DecimalConverterFactory.INSTANCE.getDecimalConverter(precision, scale);
+        DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(),
+            columnSpec.getScale());
     int size = decimalConverter.getSize();
     if (size < 0) {
-      return getLVBytesColumnPage(lvEncodedBytes, DataType.DECIMAL);
+      return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataType.DECIMAL);
     } else {
       // Here the size is always fixed.
-      return getDecimalColumnPage(lvEncodedBytes, scale, precision, size);
+      return getDecimalColumnPage(columnSpec, lvEncodedBytes, size);
     }
   }
 
   /**
    * Create a new column page based on the LV (Length Value) encoded bytes
    */
-  static ColumnPage newLVBytesColumnPage(byte[] lvEncodedBytes)
+  static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes)
       throws MemoryException {
-    return getLVBytesColumnPage(lvEncodedBytes, DataType.BYTE_ARRAY);
+    return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataType.BYTE_ARRAY);
   }
 
-  private static ColumnPage getDecimalColumnPage(byte[] lvEncodedBytes, int scale, int precision,
-      int size) throws MemoryException {
+  private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
+      byte[] lvEncodedBytes, int size) throws MemoryException {
     List<Integer> rowOffset = new ArrayList<>();
     int offset;
     int rowId = 0;
@@ -113,9 +116,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
     VarLengthColumnPageBase page;
     if (unsafe) {
-      page = new UnsafeVarLengthColumnPage(DECIMAL, rowId, scale, precision);
+      page = new UnsafeVarLengthColumnPage(columnSpec, DECIMAL, rowId);
     } else {
-      page = new SafeVarLengthColumnPage(DECIMAL, rowId, scale, precision);
+      page = new SafeVarLengthColumnPage(columnSpec, DECIMAL, rowId);
     }
 
     // set total length and rowOffset in page
@@ -130,7 +133,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     return page;
   }
 
-  private static ColumnPage getLVBytesColumnPage(byte[] lvEncodedBytes, DataType dataType)
+  private static ColumnPage getLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
+      byte[] lvEncodedBytes, DataType dataType)
       throws MemoryException {
     // extract length and data, set them to rowOffset and unsafe memory correspondingly
     int rowId = 0;
@@ -155,9 +159,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     VarLengthColumnPageBase page;
     int inputDataLength = offset;
     if (unsafe) {
-      page = new UnsafeVarLengthColumnPage(DECIMAL, numRows, inputDataLength, -1, -1);
+      page = new UnsafeVarLengthColumnPage(columnSpec, DECIMAL, numRows, inputDataLength);
     } else {
-      page = new SafeVarLengthColumnPage(dataType, numRows, -1, -1);
+      page = new SafeVarLengthColumnPage(columnSpec, dataType, numRows);
     }
 
     // set total length and rowOffset in page
@@ -309,7 +313,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   abstract void copyBytes(int rowId, byte[] dest, int destOffset, int length);
 
   @Override
-  public byte[] getFlattenedBytePage() {
+  public byte[] getLVFlattenedBytePage() throws IOException {
     // output LV encoded byte array
     int offset = 0;
     byte[] data = new byte[totalLength + pageSize * 4];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index 7a48785..3b5ae57 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -25,12 +25,15 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.ComplexDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
@@ -145,10 +148,11 @@ public abstract class ColumnPageEncoder {
 
   private static EncodedColumnPage encodeChildColumn(byte[][] data)
       throws IOException, MemoryException {
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
-    ComplexDimensionIndexCodec codec = new ComplexDimensionIndexCodec(false, false, compressor);
-    ColumnPageEncoder encoder = codec.createEncoder(null);
-    return encoder.encode(ColumnPage.wrapByteArrayPage(data));
+    TableSpec.ColumnSpec spec =
+        new TableSpec.ColumnSpec("complex_inner_column", DataType.BYTE_ARRAY, ColumnType.COMPLEX);
+    ColumnPage page = ColumnPage.wrapByteArrayPage(spec, data);
+    ColumnPageEncoder encoder = new DirectCompressCodec(DataType.BYTE_ARRAY).createEncoder(null);
+    return encoder.encode(page);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index cea35f0..87eb77a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -24,6 +24,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -35,18 +36,21 @@ import org.apache.carbondata.core.util.DataTypeUtil;
  */
 public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable {
 
-  // data type of this column
-  private DataType dataType;
+  private static final long serialVersionUID = 1905162071950251407L;
+
+  // column spec of this column
+  private transient TableSpec.ColumnSpec columnSpec;
+
+  // storage data type of this column, it could be different from data type in the column spec
+  private DataType storeDataType;
+
+  // compressor name for compressing and decompressing this column
+  private String compressorName;
 
   private int scale;
   private int precision;
 
-  public static final char BYTE_VALUE_MEASURE = 'c';
-  public static final char SHORT_VALUE_MEASURE = 'j';
-  public static final char INT_VALUE_MEASURE = 'k';
-  public static final char BIG_INT_MEASURE = 'd';
   public static final char DOUBLE_MEASURE = 'n';
-  public static final char BIG_DECIMAL_MEASURE = 'b';
   public static final char STRING = 's';
   public static final char TIMESTAMP = 't';
   public static final char DATE = 'x';
@@ -55,14 +59,22 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   public ColumnPageEncoderMeta() {
   }
 
-  public ColumnPageEncoderMeta(DataType dataType, SimpleStatsResult stats) {
-    if (dataType == null) {
-      throw new IllegalArgumentException("data type must not be null");
+  public ColumnPageEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType storeDataType,
+      SimpleStatsResult stats, String compressorName) {
+    if (columnSpec == null) {
+      throw new IllegalArgumentException("columm spec must not be null");
     }
-    this.dataType = dataType;
-    setType(convertType(dataType));
+    if (storeDataType == null) {
+      throw new IllegalArgumentException("store data type must not be null");
+    }
+    if (compressorName == null) {
+      throw new IllegalArgumentException("compressor must not be null");
+    }
+    this.columnSpec = columnSpec;
+    this.storeDataType = storeDataType;
+    this.compressorName = compressorName;
+    setType(convertType(storeDataType));
     if (stats != null) {
-      assert (stats.getDataType() == dataType);
       setDecimal(stats.getDecimalCount());
       setMaxValue(stats.getMax());
       setMinValue(stats.getMin());
@@ -75,6 +87,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
     switch (type) {
       case BYTE:
       case SHORT:
+      case SHORT_INT:
       case INT:
       case LONG:
         return CarbonCommonConstants.BIG_INT_MEASURE;
@@ -95,28 +108,33 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
     }
   }
 
-  public DataType getDataType() {
-    return dataType;
+  public DataType getStoreDataType() {
+    return storeDataType;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeByte(dataType.ordinal());
+    columnSpec.write(out);
+    out.writeByte(storeDataType.ordinal());
     out.writeInt(getDecimal());
     out.writeByte(getDataTypeSelected());
     writeMinMax(out);
+    out.writeUTF(compressorName);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    dataType = DataType.valueOf(in.readByte());
+    columnSpec = new TableSpec.ColumnSpec();
+    columnSpec.readFields(in);
+    storeDataType = DataType.valueOf(in.readByte());
     setDecimal(in.readInt());
     setDataTypeSelected(in.readByte());
     readMinMax(in);
+    compressorName = in.readUTF();
   }
 
   private void writeMinMax(DataOutput out) throws IOException {
-    switch (dataType) {
+    switch (columnSpec.getSchemaDataType()) {
       case BYTE:
         out.writeByte((byte) getMaxValue());
         out.writeByte((byte) getMinValue());
@@ -161,12 +179,12 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
         // TODO: support stats for complex type
         break;
       default:
-        throw new IllegalArgumentException("invalid data type: " + dataType);
+        throw new IllegalArgumentException("invalid data type: " + storeDataType);
     }
   }
 
   private void readMinMax(DataInput in) throws IOException {
-    switch (dataType) {
+    switch (columnSpec.getSchemaDataType()) {
       case BYTE:
         this.setMaxValue(in.readByte());
         this.setMinValue(in.readByte());
@@ -210,7 +228,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
         // TODO: support stats for complex type
         break;
       default:
-        throw new IllegalArgumentException("invalid data type: " + dataType);
+        throw new IllegalArgumentException("invalid data type: " + storeDataType);
     }
   }
 
@@ -227,7 +245,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
    */
   private byte[] getValueAsBytes(Object value) {
     ByteBuffer b;
-    switch (dataType) {
+    switch (storeDataType) {
       case BYTE:
         b = ByteBuffer.allocate(8);
         b.putLong((byte) value);
@@ -260,7 +278,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       case DATE:
         return (byte[]) value;
       default:
-        throw new IllegalArgumentException("Invalid data type: " + dataType);
+        throw new IllegalArgumentException("Invalid data type: " + storeDataType);
     }
   }
 
@@ -271,4 +289,16 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   public int getPrecision() {
     return precision;
   }
+
+  public TableSpec.ColumnSpec getColumnSpec() {
+    return columnSpec;
+  }
+
+  public String getCompressorName() {
+    return compressorName;
+  }
+
+  public DataType getSchemaDataType() {
+    return columnSpec.getSchemaDataType();
+  }
 }