You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/09/14 09:20:41 UTC

[48/54] [abbrv] carbondata git commit: [CARBONDATA-1400] Fix bug of array column out of bound when writing carbondata file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
new file mode 100644
index 0000000..f08444b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
+import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.ComplexDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DictDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Default factory will select encoding base on column page data type and statistics
+ */
+public class DefaultEncodingFactory extends EncodingFactory {
+
+  private static final int THREE_BYTES_MAX = (int) Math.pow(2, 23) - 1;
+  private static final int THREE_BYTES_MIN = - THREE_BYTES_MAX - 1;
+
+  private static final boolean newWay = false;
+
+  private static EncodingFactory encodingFactory = new DefaultEncodingFactory();
+
+  public static EncodingFactory getInstance() {
+    // TODO: make it configurable after added new encodingFactory
+    return encodingFactory;
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPage inputPage) {
+    // TODO: add log
+    if (columnSpec instanceof TableSpec.MeasureSpec) {
+      return createEncoderForMeasure(inputPage);
+    } else {
+      if (newWay) {
+        return createEncoderForDimension((TableSpec.DimensionSpec) columnSpec, inputPage);
+      } else {
+        assert columnSpec instanceof TableSpec.DimensionSpec;
+        return createEncoderForDimensionLegacy((TableSpec.DimensionSpec) columnSpec);
+      }
+    }
+  }
+
+  private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec,
+      ColumnPage inputPage) {
+    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    switch (columnSpec.getColumnType()) {
+      case GLOBAL_DICTIONARY:
+      case DIRECT_DICTIONARY:
+      case PLAIN_VALUE:
+        return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null);
+      case COMPLEX:
+        return new ComplexDimensionIndexCodec(false, false, compressor).createEncoder(null);
+      default:
+        throw new RuntimeException("unsupported dimension type: " +
+            columnSpec.getColumnType());
+    }
+  }
+
+  private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec columnSpec) {
+    TableSpec.DimensionSpec dimensionSpec = columnSpec;
+    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    switch (dimensionSpec.getColumnType()) {
+      case GLOBAL_DICTIONARY:
+        return new DictDimensionIndexCodec(
+            dimensionSpec.isInSortColumns(),
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+            compressor).createEncoder(null);
+      case DIRECT_DICTIONARY:
+        return new DirectDictDimensionIndexCodec(
+            dimensionSpec.isInSortColumns(),
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+            compressor).createEncoder(null);
+      case PLAIN_VALUE:
+        return new HighCardDictDimensionIndexCodec(
+            dimensionSpec.isInSortColumns(),
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+            compressor).createEncoder(null);
+      default:
+        throw new RuntimeException("unsupported dimension type: " +
+            dimensionSpec.getColumnType());
+    }
+  }
+
+  private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) {
+    SimpleStatsResult stats = columnPage.getStatistics();
+    switch (stats.getDataType()) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
+      case FLOAT:
+      case DOUBLE:
+        return selectCodecByAlgorithmForFloating(stats).createEncoder(null);
+      case DECIMAL:
+      case BYTE_ARRAY:
+        return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
+      default:
+        throw new RuntimeException("unsupported data type: " + stats.getDataType());
+    }
+  }
+
+  private static DataType fitLongMinMax(long max, long min) {
+    if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
+      return DataType.BYTE;
+    } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
+      return DataType.SHORT;
+    } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) {
+      return DataType.SHORT_INT;
+    } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
+      return DataType.INT;
+    } else {
+      return DataType.LONG;
+    }
+  }
+
+  private static DataType fitMinMax(DataType dataType, Object max, Object min) {
+    switch (dataType) {
+      case BYTE:
+        return fitLongMinMax((byte) max, (byte) min);
+      case SHORT:
+        return fitLongMinMax((short) max, (short) min);
+      case INT:
+        return fitLongMinMax((int) max, (int) min);
+      case LONG:
+        return fitLongMinMax((long) max, (long) min);
+      case DOUBLE:
+        return fitLongMinMax((long) (double) max, (long) (double) min);
+      default:
+        throw new RuntimeException("internal error: " + dataType);
+    }
+  }
+
+  // fit the long input value into minimum data type
+  private static DataType fitDelta(DataType dataType, Object max, Object min) {
+    // use long data type to calculate delta to avoid overflow
+    long value;
+    switch (dataType) {
+      case BYTE:
+        value = (long)(byte) max - (long)(byte) min;
+        break;
+      case SHORT:
+        value = (long)(short) max - (long)(short) min;
+        break;
+      case INT:
+        value = (long)(int) max - (long)(int) min;
+        break;
+      case LONG:
+        // TODO: add overflow detection and return delta type
+        return DataType.LONG;
+      case DOUBLE:
+        return DataType.LONG;
+      default:
+        throw new RuntimeException("internal error: " + dataType);
+    }
+    if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
+      return DataType.BYTE;
+    } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
+      return DataType.SHORT;
+    } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
+      return DataType.SHORT_INT;
+    } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+      return DataType.INT;
+    } else {
+      return DataType.LONG;
+    }
+  }
+
+  /**
+   * choose between adaptive encoder or delta adaptive encoder, based on whose target data type
+   * size is smaller
+   */
+  static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats) {
+    DataType srcDataType = stats.getDataType();
+    DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
+    DataType deltaDataType;
+
+    if (adaptiveDataType == DataType.LONG) {
+      deltaDataType = DataType.LONG;
+    } else {
+      deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
+    }
+    if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) ==
+        srcDataType.getSizeInBytes()) {
+      // no effect to use adaptive or delta, use compression only
+      return new DirectCompressCodec(stats.getDataType());
+    }
+    if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
+      // choose adaptive encoding
+      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats);
+    } else {
+      // choose delta adaptive encoding
+      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats);
+    }
+  }
+
+  // choose between upscale adaptive encoder or upscale delta adaptive encoder,
+  // based on whose target data type size is smaller
+  static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats) {
+    DataType srcDataType = stats.getDataType();
+    double maxValue = (double) stats.getMax();
+    double minValue = (double) stats.getMin();
+    int decimalCount = stats.getDecimalCount();
+
+    //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
+    //but we can't use -1 to getDatatype, we should use -10000000.
+    double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
+
+    if (decimalCount == 0) {
+      // short, int, long
+      return selectCodecByAlgorithmForIntegral(stats);
+    } else if (decimalCount < 0) {
+      return new DirectCompressCodec(DataType.DOUBLE);
+    } else {
+      // double
+      long max = (long) (Math.pow(10, decimalCount) * absMaxValue);
+      DataType adaptiveDataType = fitLongMinMax(max, 0);
+      if (adaptiveDataType.getSizeInBytes() < DataType.DOUBLE.getSizeInBytes()) {
+        return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats);
+      } else {
+        return new DirectCompressCodec(DataType.DOUBLE);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
deleted file mode 100644
index 04ca8a3..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
-import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
-import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.ComplexDimensionIndexCodec;
-import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DictDimensionIndexCodec;
-import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec;
-import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * Default strategy will select encoding base on column page data type and statistics
- */
-public class DefaultEncodingStrategy extends EncodingStrategy {
-
-  private static final int THREE_BYTES_MAX = (int) Math.pow(2, 23) - 1;
-  private static final int THREE_BYTES_MIN = - THREE_BYTES_MAX - 1;
-
-  private static final boolean newWay = false;
-
-  @Override
-  public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPage inputPage) {
-    // TODO: add log
-    if (columnSpec instanceof TableSpec.MeasureSpec) {
-      return createEncoderForMeasure(inputPage);
-    } else {
-      if (newWay) {
-        return createEncoderForDimension((TableSpec.DimensionSpec) columnSpec, inputPage);
-      } else {
-        assert columnSpec instanceof TableSpec.DimensionSpec;
-        return createEncoderForDimensionLegacy((TableSpec.DimensionSpec) columnSpec);
-      }
-    }
-  }
-
-  private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec,
-      ColumnPage inputPage) {
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
-    switch (columnSpec.getDimensionType()) {
-      case GLOBAL_DICTIONARY:
-      case DIRECT_DICTIONARY:
-      case PLAIN_VALUE:
-        return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null);
-      case COMPLEX:
-        return new ComplexDimensionIndexCodec(false, false, compressor).createEncoder(null);
-      default:
-        throw new RuntimeException("unsupported dimension type: " +
-            columnSpec.getDimensionType());
-    }
-  }
-
-  private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec columnSpec) {
-    TableSpec.DimensionSpec dimensionSpec = columnSpec;
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
-    switch (dimensionSpec.getDimensionType()) {
-      case GLOBAL_DICTIONARY:
-        return new DictDimensionIndexCodec(
-            dimensionSpec.isInSortColumns(),
-            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            compressor).createEncoder(null);
-      case DIRECT_DICTIONARY:
-        return new DirectDictDimensionIndexCodec(
-            dimensionSpec.isInSortColumns(),
-            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            compressor).createEncoder(null);
-      case PLAIN_VALUE:
-        return new HighCardDictDimensionIndexCodec(
-            dimensionSpec.isInSortColumns(),
-            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            compressor).createEncoder(null);
-      default:
-        throw new RuntimeException("unsupported dimension type: " +
-            dimensionSpec.getDimensionType());
-    }
-  }
-
-  private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) {
-    SimpleStatsResult stats = columnPage.getStatistics();
-    switch (stats.getDataType()) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
-      case FLOAT:
-      case DOUBLE:
-        return selectCodecByAlgorithmForFloating(stats).createEncoder(null);
-      case DECIMAL:
-      case BYTE_ARRAY:
-        return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
-      default:
-        throw new RuntimeException("unsupported data type: " + stats.getDataType());
-    }
-  }
-
-  private static DataType fitLongMinMax(long max, long min) {
-    if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
-      return DataType.BYTE;
-    } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
-      return DataType.SHORT;
-    } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) {
-      return DataType.SHORT_INT;
-    } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
-      return DataType.INT;
-    } else {
-      return DataType.LONG;
-    }
-  }
-
-  private static DataType fitMinMax(DataType dataType, Object max, Object min) {
-    switch (dataType) {
-      case BYTE:
-        return fitLongMinMax((byte) max, (byte) min);
-      case SHORT:
-        return fitLongMinMax((short) max, (short) min);
-      case INT:
-        return fitLongMinMax((int) max, (int) min);
-      case LONG:
-        return fitLongMinMax((long) max, (long) min);
-      case DOUBLE:
-        return fitLongMinMax((long) (double) max, (long) (double) min);
-      default:
-        throw new RuntimeException("internal error: " + dataType);
-    }
-  }
-
-  // fit the long input value into minimum data type
-  private static DataType fitDelta(DataType dataType, Object max, Object min) {
-    // use long data type to calculate delta to avoid overflow
-    long value;
-    switch (dataType) {
-      case BYTE:
-        value = (long)(byte) max - (long)(byte) min;
-        break;
-      case SHORT:
-        value = (long)(short) max - (long)(short) min;
-        break;
-      case INT:
-        value = (long)(int) max - (long)(int) min;
-        break;
-      case LONG:
-        // TODO: add overflow detection and return delta type
-        return DataType.LONG;
-      case DOUBLE:
-        return DataType.LONG;
-      default:
-        throw new RuntimeException("internal error: " + dataType);
-    }
-    if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
-      return DataType.BYTE;
-    } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
-      return DataType.SHORT;
-    } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
-      return DataType.SHORT_INT;
-    } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
-      return DataType.INT;
-    } else {
-      return DataType.LONG;
-    }
-  }
-
-  /**
-   * choose between adaptive encoder or delta adaptive encoder, based on whose target data type
-   * size is smaller
-   */
-  static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats) {
-    DataType srcDataType = stats.getDataType();
-    DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
-    DataType deltaDataType;
-
-    if (adaptiveDataType == DataType.LONG) {
-      deltaDataType = DataType.LONG;
-    } else {
-      deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
-    }
-    if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) ==
-        srcDataType.getSizeInBytes()) {
-      // no effect to use adaptive or delta, use compression only
-      return new DirectCompressCodec(stats.getDataType());
-    }
-    if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
-      // choose adaptive encoding
-      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats);
-    } else {
-      // choose delta adaptive encoding
-      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats);
-    }
-  }
-
-  // choose between upscale adaptive encoder or upscale delta adaptive encoder,
-  // based on whose target data type size is smaller
-  static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats) {
-    DataType srcDataType = stats.getDataType();
-    double maxValue = (double) stats.getMax();
-    double minValue = (double) stats.getMin();
-    int decimalCount = stats.getDecimalCount();
-
-    //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
-    //but we can't use -1 to getDatatype, we should use -10000000.
-    double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
-
-    if (decimalCount == 0) {
-      // short, int, long
-      return selectCodecByAlgorithmForIntegral(stats);
-    } else if (decimalCount < 0) {
-      return new DirectCompressCodec(DataType.DOUBLE);
-    } else {
-      // double
-      long max = (long) (Math.pow(10, decimalCount) * absMaxValue);
-      DataType adaptiveDataType = fitLongMinMax(max, 0);
-      if (adaptiveDataType.getSizeInBytes() < DataType.DOUBLE.getSizeInBytes()) {
-        return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats);
-      } else {
-        return new DirectCompressCodec(DataType.DOUBLE);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
new file mode 100644
index 0000000..9a52183
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.Encoding;
+
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING;
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS;
+import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL;
+
+/**
+ * Base class for encoding factory implementation.
+ */
+public abstract class EncodingFactory {
+
+  /**
+   * Return new encoder for specified column
+   */
+  public abstract ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec,
+      ColumnPage inputPage);
+
+  /**
+   * Return new decoder based on encoder metadata read from file
+   */
+  public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas)
+      throws IOException {
+    assert (encodings.size() == 1);
+    assert (encoderMetas.size() == 1);
+    Encoding encoding = encodings.get(0);
+    byte[] encoderMeta = encoderMetas.get(0).array();
+    ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta);
+    DataInputStream in = new DataInputStream(stream);
+    if (encoding == DIRECT_COMPRESS) {
+      ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.readFields(in);
+      return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
+    } else if (encoding == ADAPTIVE_INTEGRAL) {
+      ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.readFields(in);
+      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+      return new AdaptiveIntegralCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
+          stats).createDecoder(metadata);
+    } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
+      ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.readFields(in);
+      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+      return new AdaptiveDeltaIntegralCodec(metadata.getSchemaDataType(),
+          metadata.getStoreDataType(), stats).createDecoder(metadata);
+    } else if (encoding == ADAPTIVE_FLOATING) {
+      ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.readFields(in);
+      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+      return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
+          stats).createDecoder(metadata);
+    } else if (encoding == RLE_INTEGRAL) {
+      RLEEncoderMeta metadata = new RLEEncoderMeta();
+      metadata.readFields(in);
+      return new RLECodec().createDecoder(metadata);
+    } else {
+      // for backward compatibility
+      ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
+      return createDecoderLegacy(metadata);
+    }
+  }
+
+  /**
+   * Old way of creating decoder, based on algorithm
+   */
+  public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
+    SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+    TableSpec.ColumnSpec spec = new TableSpec.ColumnSpec("legacy", stats.getDataType(),
+        ColumnType.MEASURE);
+    String compressor = "snappy";
+    switch (metadata.getType()) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        // create the codec based on algorithm and create decoder by recovering the metadata
+        ColumnPageCodec codec = DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats);
+        if (codec instanceof AdaptiveIntegralCodec) {
+          AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
+          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+              adaptiveCodec.getTargetDataType(), stats, compressor);
+          return codec.createDecoder(meta);
+        } else if (codec instanceof AdaptiveDeltaIntegralCodec) {
+          AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
+          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+              adaptiveCodec.getTargetDataType(), stats, compressor);
+          return codec.createDecoder(meta);
+        } else if (codec instanceof DirectCompressCodec) {
+          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+              metadata.getType(), stats, compressor);
+          return codec.createDecoder(meta);
+        } else {
+          throw new RuntimeException("internal error");
+        }
+      case FLOAT:
+      case DOUBLE:
+        // create the codec based on algorithm and create decoder by recovering the metadata
+        codec = DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats);
+        if (codec instanceof AdaptiveFloatingCodec) {
+          AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
+          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+              adaptiveCodec.getTargetDataType(), stats, compressor);
+          return codec.createDecoder(meta);
+        } else if (codec instanceof DirectCompressCodec) {
+          ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+              metadata.getType(), stats, compressor);
+          return codec.createDecoder(meta);
+        } else {
+          throw new RuntimeException("internal error");
+        }
+      case DECIMAL:
+      case BYTE_ARRAY:
+        // no dictionary dimension
+        return new DirectCompressCodec(stats.getDataType()).createDecoder(
+            new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));
+      default:
+        throw new RuntimeException("unsupported data type: " + stats.getDataType());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
deleted file mode 100644
index 3b7b10c..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralEncoderMeta;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingEncoderMeta;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralEncoderMeta;
-import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
-import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressorEncoderMeta;
-import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
-import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
-import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.ValueEncoderMeta;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.format.Encoding;
-
-import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL;
-import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING;
-import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL;
-import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS;
-import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL;
-
-/**
- * Base class for encoding strategy implementation.
- */
-public abstract class EncodingStrategy {
-
-  /**
-   * Return new encoder for specified column
-   */
-  public abstract ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec,
-      ColumnPage inputPage);
-
-  /**
-   * Return new decoder based on encoder metadata read from file
-   */
-  public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas)
-      throws IOException {
-    assert (encodings.size() == 1);
-    assert (encoderMetas.size() == 1);
-    Encoding encoding = encodings.get(0);
-    byte[] encoderMeta = encoderMetas.get(0).array();
-    ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta);
-    DataInputStream in = new DataInputStream(stream);
-    if (encoding == DIRECT_COMPRESS) {
-      DirectCompressorEncoderMeta metadata = new DirectCompressorEncoderMeta();
-      metadata.readFields(in);
-      return new DirectCompressCodec(metadata.getDataType()).createDecoder(metadata);
-    } else if (encoding == ADAPTIVE_INTEGRAL) {
-      AdaptiveIntegralEncoderMeta metadata = new AdaptiveIntegralEncoderMeta();
-      metadata.readFields(in);
-      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
-      return new AdaptiveIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(),
-          stats).createDecoder(metadata);
-    } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
-      AdaptiveDeltaIntegralEncoderMeta metadata = new AdaptiveDeltaIntegralEncoderMeta();
-      metadata.readFields(in);
-      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
-      return new AdaptiveDeltaIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(),
-          stats).createDecoder(metadata);
-    } else if (encoding == RLE_INTEGRAL) {
-      RLEEncoderMeta metadata = new RLEEncoderMeta();
-      metadata.readFields(in);
-      return new RLECodec().createDecoder(metadata);
-    } else if (encoding == ADAPTIVE_FLOATING) {
-      AdaptiveFloatingEncoderMeta metadata = new AdaptiveFloatingEncoderMeta();
-      metadata.readFields(in);
-      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
-      return new AdaptiveFloatingCodec(metadata.getDataType(), metadata.getTargetDataType(),
-          stats).createDecoder(metadata);
-    } else {
-      // for backward compatibility
-      ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
-      return createDecoderLegacy(metadata);
-    }
-  }
-
-  /**
-   * Old way of creating decoder, based on algorithm
-   */
-  public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
-    SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
-    switch (metadata.getType()) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        // create the codec based on algorithm and create decoder by recovering the metadata
-        ColumnPageCodec codec = DefaultEncodingStrategy.selectCodecByAlgorithmForIntegral(stats);
-        if (codec instanceof AdaptiveIntegralCodec) {
-          AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
-          AdaptiveIntegralEncoderMeta meta = new AdaptiveIntegralEncoderMeta(
-              "snappy", adaptiveCodec.getTargetDataType(), stats);
-          return codec.createDecoder(meta);
-        } else if (codec instanceof AdaptiveDeltaIntegralCodec) {
-          AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
-          AdaptiveDeltaIntegralEncoderMeta meta = new AdaptiveDeltaIntegralEncoderMeta(
-              "snappy", adaptiveCodec.getTargetDataType(), stats);
-          return codec.createDecoder(meta);
-        } else if (codec instanceof DirectCompressCodec) {
-          DirectCompressorEncoderMeta meta = new DirectCompressorEncoderMeta(
-              "snappy", metadata.getType(), stats);
-          return codec.createDecoder(meta);
-        } else {
-          throw new RuntimeException("internal error");
-        }
-      case FLOAT:
-      case DOUBLE:
-        // create the codec based on algorithm and create decoder by recovering the metadata
-        codec = DefaultEncodingStrategy.selectCodecByAlgorithmForFloating(stats);
-        if (codec instanceof AdaptiveFloatingCodec) {
-          AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
-          AdaptiveFloatingEncoderMeta meta = new AdaptiveFloatingEncoderMeta(
-              "snappy", adaptiveCodec.getTargetDataType(), stats);
-          return codec.createDecoder(meta);
-        } else if (codec instanceof DirectCompressCodec) {
-          DirectCompressorEncoderMeta meta = new DirectCompressorEncoderMeta(
-              "snappy", metadata.getType(), stats);
-          return codec.createDecoder(meta);
-        } else {
-          throw new RuntimeException("internal error");
-        }
-      case DECIMAL:
-      case BYTE_ARRAY:
-        // no dictionary dimension
-        return new DirectCompressCodec(stats.getDataType()).createDecoder(
-            new DirectCompressorEncoderMeta("snappy", stats.getDataType(), stats));
-      default:
-        throw new RuntimeException("unsupported data type: " + stats.getDataType());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
deleted file mode 100644
index 56527cb..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-/**
- * Factory to create Encoding Strategy.
- * Now only a default strategy is supported which will choose encoding based on data type
- * and column data stats.
- */
-public class EncodingStrategyFactory {
-
-  private static EncodingStrategy defaultStrategy = new DefaultEncodingStrategy();
-
-  public static EncodingStrategy getStrategy() {
-    // TODO: make it configurable after added new strategy
-    return defaultStrategy;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
index 135c317..ece5cb6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
@@ -17,9 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
-import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
@@ -47,10 +45,6 @@ public abstract class AdaptiveCodec implements ColumnPageCodec {
     this.targetDataType = targetDataType;
   }
 
-  public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
-    throw new UnsupportedOperationException("internal error");
-  }
-
   public DataType getTargetDataType() {
     return targetDataType;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 60ff3ab..ad327f7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -87,7 +87,8 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+            input.getPageSize());
         input.convertValue(converter);
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
@@ -96,8 +97,8 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
 
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
-        return new AdaptiveDeltaIntegralEncoderMeta(
-            compressor.getName(), targetDataType, inputPage.getStatistics());
+        return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType,
+            inputPage.getStatistics(), compressor.getName());
       }
 
       @Override
@@ -111,16 +112,12 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
   }
 
   @Override
-  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
-    assert meta instanceof AdaptiveDeltaIntegralEncoderMeta;
-    AdaptiveDeltaIntegralEncoderMeta codecMeta = (AdaptiveDeltaIntegralEncoderMeta) meta;
-    final Compressor compressor = CompressorFactory.getInstance().getCompressor(
-        codecMeta.getCompressorName());
+  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
     return new ColumnPageDecoder() {
       @Override
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
         return LazyColumnPage.newPage(page, converter);
       }
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
deleted file mode 100644
index c2d86d9..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.adaptive;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-public class AdaptiveDeltaIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable {
-
-  public AdaptiveDeltaIntegralEncoderMeta() {
-  }
-
-  public AdaptiveDeltaIntegralEncoderMeta(String compressorName, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(targetDataType, stats, compressorName);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
deleted file mode 100644
index 3104dd6..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.adaptive;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-/**
- * Metadata for AdaptiveIntegralCodec and DeltaIntegralCodec
- */
-public class AdaptiveEncoderMeta extends ColumnPageEncoderMeta implements Writable {
-
-  private DataType targetDataType;
-  private String compressorName;
-
-  AdaptiveEncoderMeta() {
-
-  }
-
-  AdaptiveEncoderMeta(DataType targetDataType, SimpleStatsResult stats,
-      String compressorName) {
-    super(stats.getDataType(), stats);
-    this.targetDataType = targetDataType;
-    this.compressorName = compressorName;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    out.writeByte(targetDataType.ordinal());
-    out.writeUTF(compressorName);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    this.targetDataType = DataType.valueOf(in.readByte());
-    this.compressorName = in.readUTF();
-  }
-
-  public DataType getTargetDataType() {
-    return targetDataType;
-  }
-
-  public String getCompressorName() {
-    return compressorName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 789383c..c238245 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -71,7 +71,8 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+            input.getPageSize());
         input.convertValue(converter);
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
@@ -87,24 +88,20 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
 
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
-        return new AdaptiveFloatingEncoderMeta(compressor.getName(), targetDataType, stats);
+        return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
+            compressor.getName());
       }
 
     };
   }
 
   @Override
-  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
-    assert meta instanceof AdaptiveFloatingEncoderMeta;
-    AdaptiveFloatingEncoderMeta codecMeta = (AdaptiveFloatingEncoderMeta) meta;
-    final Compressor compressor =
-        CompressorFactory.getInstance().getCompressor(codecMeta.getCompressorName());
-    final DataType targetDataType = codecMeta.getTargetDataType();
+  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
     return new ColumnPageDecoder() {
       @Override
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
         return LazyColumnPage.newPage(page, converter);
       }
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java
deleted file mode 100644
index 085e751..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.adaptive;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-public class AdaptiveFloatingEncoderMeta extends AdaptiveEncoderMeta implements Writable {
-
-  public AdaptiveFloatingEncoderMeta() {
-  }
-
-  public AdaptiveFloatingEncoderMeta(String compressorName, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(targetDataType, stats, compressorName);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 543a86e..6df2e64 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -62,7 +62,8 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+            input.getPageSize());
         input.convertValue(converter);
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
@@ -78,24 +79,20 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
 
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
-        return new AdaptiveIntegralEncoderMeta(compressor.getName(), targetDataType, stats);
+        return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
+            compressor.getName());
       }
 
     };
   }
 
   @Override
-  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
-    assert meta instanceof AdaptiveIntegralEncoderMeta;
-    AdaptiveIntegralEncoderMeta codecMeta = (AdaptiveIntegralEncoderMeta) meta;
-    final Compressor compressor = CompressorFactory.getInstance().getCompressor(
-        codecMeta.getCompressorName());
-    final DataType targetDataType = codecMeta.getTargetDataType();
+  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
     return new ColumnPageDecoder() {
       @Override
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
         return LazyColumnPage.newPage(page, converter);
       }
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
deleted file mode 100644
index 0a4f399..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.adaptive;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-public class AdaptiveIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable {
-
-  public AdaptiveIntegralEncoderMeta() {
-  }
-
-  public AdaptiveIntegralEncoderMeta(String compressorName, DataType targetDataType,
-      SimpleStatsResult stats) {
-    super(targetDataType, stats, compressorName);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index cb1508f..13879b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -60,10 +60,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
 
   @Override
   public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
-    assert meta instanceof DirectCompressorEncoderMeta;
-    DirectCompressorEncoderMeta codecMeta = (DirectCompressorEncoderMeta) meta;
-    return new DirectDecompressor(codecMeta.getCompressorName(),
-        codecMeta.getScale(), codecMeta.getPrecision());
+    return new DirectDecompressor(meta);
   }
 
   private static class DirectCompressor extends ColumnPageEncoder {
@@ -88,32 +85,27 @@ public class DirectCompressCodec implements ColumnPageCodec {
 
     @Override
     protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
-      return new DirectCompressorEncoderMeta(compressor.getName(), inputPage.getDataType(),
-          inputPage.getStatistics());
+      return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(),
+          inputPage.getStatistics(), compressor.getName());
     }
 
   }
 
   private class DirectDecompressor implements ColumnPageDecoder {
 
-    private Compressor compressor;
-    private int scale;
-    private int precision;
+    private ColumnPageEncoderMeta meta;
 
-    DirectDecompressor(String compressorName, int scale, int precision) {
-      this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
-      this.scale = scale;
-      this.precision = precision;
+    DirectDecompressor(ColumnPageEncoderMeta meta) {
+      this.meta = meta;
     }
 
     @Override
     public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
       ColumnPage decodedPage;
       if (dataType == DataType.DECIMAL) {
-        decodedPage = ColumnPage.decompressDecimalPage(compressor, input, offset, length,
-            scale, precision);
+        decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
       } else {
-        decodedPage = ColumnPage.decompress(compressor, dataType, input, offset, length);
+        decodedPage = ColumnPage.decompress(meta, input, offset, length);
       }
       return LazyColumnPage.newPage(decodedPage, converter);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
deleted file mode 100644
index cf19259..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.compress;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-public class DirectCompressorEncoderMeta extends ColumnPageEncoderMeta implements Writable {
-  private String compressorName;
-
-  public DirectCompressorEncoderMeta() {
-  }
-
-  public DirectCompressorEncoderMeta(String compressorName, final DataType dataType,
-      SimpleStatsResult stats) {
-    super(dataType, stats);
-    this.compressorName = compressorName;
-  }
-
-  public String getCompressorName() {
-    return compressorName;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    out.writeUTF(compressorName);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    compressorName = in.readUTF();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index 12690a5..419b589 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -64,7 +65,7 @@ public class RLECodec implements ColumnPageCodec {
   public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
     assert meta instanceof RLEEncoderMeta;
     RLEEncoderMeta codecMeta = (RLEEncoderMeta) meta;
-    return new RLEDecoder(codecMeta.getDataType(), codecMeta.getPageSize());
+    return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize());
   }
 
   // This codec supports integral type only
@@ -157,7 +158,7 @@ public class RLECodec implements ColumnPageCodec {
 
     @Override
     protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
-      return new RLEEncoderMeta(
+      return new RLEEncoderMeta(inputPage.getColumnSpec(),
           inputPage.getDataType(), inputPage.getPageSize(), inputPage.getStatistics());
     }
 
@@ -291,21 +292,21 @@ public class RLECodec implements ColumnPageCodec {
   // TODO: add a on-the-fly decoder for filter query with high selectivity
   private class RLEDecoder implements ColumnPageDecoder {
 
-    // src data type
-    private DataType dataType;
+    private TableSpec.ColumnSpec columnSpec;
     private int pageSize;
 
-    private RLEDecoder(DataType dataType, int pageSize) {
-      validateDataType(dataType);
-      this.dataType = dataType;
+    private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize) {
+      validateDataType(columnSpec.getSchemaDataType());
+      this.columnSpec = columnSpec;
       this.pageSize = pageSize;
     }
 
     @Override
     public ColumnPage decode(byte[] input, int offset, int length)
         throws MemoryException, IOException {
+      DataType dataType = columnSpec.getSchemaDataType();
       DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
-      ColumnPage resultPage = ColumnPage.newPage(dataType, pageSize);
+      ColumnPage resultPage = ColumnPage.newPage(columnSpec, dataType, pageSize);
       switch (dataType) {
         case BYTE:
           decodeBytePage(in, resultPage);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
index 5d68872..8871671 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -37,8 +38,9 @@ public class RLEEncoderMeta extends ColumnPageEncoderMeta implements Writable {
 
   }
 
-  public RLEEncoderMeta(DataType dataType, int pageSize, SimpleStatsResult stats) {
-    super(dataType, stats);
+  public RLEEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
+      SimpleStatsResult stats) {
+    super(columnSpec, dataType, stats, "");
     this.pageSize = pageSize;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index 4fb891f..2f6178b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -51,10 +51,10 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
 
   // this is for decode flow, create stats from encoder meta in carbondata file
   public static PrimitivePageStatsCollector newInstance(ColumnPageEncoderMeta meta) {
-    PrimitivePageStatsCollector instance =
-        new PrimitivePageStatsCollector(meta.getDataType(), meta.getScale(), meta.getPrecision());
+    PrimitivePageStatsCollector instance = new PrimitivePageStatsCollector(meta.getSchemaDataType(),
+        meta.getScale(), meta.getPrecision());
     // set min max from meta
-    switch (meta.getDataType()) {
+    switch (meta.getSchemaDataType()) {
       case BYTE:
         instance.minByte = (byte) meta.getMinValue();
         instance.maxByte = (byte) meta.getMaxValue();
@@ -85,7 +85,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
         break;
       default:
         throw new UnsupportedOperationException(
-            "unsupported data type for stats collection: " + meta.getDataType());
+            "unsupported data type for stats collection: " + meta.getSchemaDataType());
     }
     return instance;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index 4caa0b3..39227a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -63,16 +63,15 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
 
   public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
       int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException {
-    byte[] input = new byte[8];
-    copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber, input);
+    byte[] input = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
     ByteBuffer byteArray = ByteBuffer.wrap(input);
     int dataLength = byteArray.getInt();
     dataOutputStream.writeInt(dataLength);
     if (dataLength > 0) {
-      int columnIndex = byteArray.getInt();
+      int dataOffset = byteArray.getInt();
       for (int i = 0; i < dataLength; i++) {
         children
-            .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, columnIndex++, pageNumber,
+            .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, dataOffset++, pageNumber,
                 dataOutputStream);
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
index 2274186..ee43a10 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
@@ -36,17 +36,16 @@ public class ComplexQueryType {
   }
 
   /**
-   * Method will copy the block chunk holder data to the passed
-   * byte[], this method is also used by child
-   *
-   * @param rowNumber
-   * @param input
+   * Method will copy the block chunk holder data and return the cloned value.
+   * This method is also used by child.
    */
-  protected void copyBlockDataChunk(DimensionRawColumnChunk[] rawColumnChunks,
-      int rowNumber, int pageNumber, byte[] input) {
+  protected byte[] copyBlockDataChunk(DimensionRawColumnChunk[] rawColumnChunks,
+      int rowNumber, int pageNumber) {
     byte[] data =
         rawColumnChunks[blockIndex].convertToDimColDataChunk(pageNumber).getChunkData(rowNumber);
-    System.arraycopy(data, 0, input, 0, data.length);
+    byte[] output = new byte[data.length];
+    System.arraycopy(data, 0, output, 0, output.length);
+    return output;
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index 9c9be86..56c265b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -46,8 +45,6 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
 
   private int keySize;
 
-  private int blockIndex;
-
   private Dictionary dictionary;
 
   private org.apache.carbondata.core.metadata.datatype.DataType dataType;
@@ -63,7 +60,6 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
     this.dictionary = dictionary;
     this.name = name;
     this.parentname = parentname;
-    this.blockIndex = blockIndex;
     this.isDirectDictionary = isDirectDictionary;
   }
 
@@ -95,10 +91,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
   @Override public void parseBlocksAndReturnComplexColumnByteArray(
       DimensionRawColumnChunk[] rawColumnChunks, int rowNumber,
       int pageNumber, DataOutputStream dataOutputStream) throws IOException {
-    DimensionColumnDataChunk dataChunk =
-        rawColumnChunks[blockIndex].convertToDimColDataChunk(pageNumber);
-    byte[] currentVal = new byte[dataChunk.getColumnValueSize()];
-    copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber, currentVal);
+    byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
     dataOutputStream.write(currentVal);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index bb64e92..23a9f81 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -84,8 +84,7 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
   @Override public void parseBlocksAndReturnComplexColumnByteArray(
       DimensionRawColumnChunk[] dimensionColumnDataChunks, int rowNumber,
       int pageNumber, DataOutputStream dataOutputStream) throws IOException {
-    byte[] input = new byte[8];
-    copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber, input);
+    byte[] input = copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber);
     ByteBuffer byteArray = ByteBuffer.wrap(input);
     int childElement = byteArray.getInt();
     dataOutputStream.writeInt(childElement);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
index 9e17717..79d3388 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
@@ -20,6 +20,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
 import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
@@ -42,7 +44,9 @@ public class RLECodecSuite {
 
     TestData(byte[] inputByteData, byte[] expectedEncodedByteData) throws IOException, MemoryException {
       this.inputByteData = inputByteData;
-      inputBytePage = ColumnPage.newPage(DataType.BYTE, inputByteData.length);
+      inputBytePage = ColumnPage.newPage(
+          new TableSpec.ColumnSpec("test", DataType.BYTE, ColumnType.MEASURE),
+          DataType.BYTE, inputByteData.length);
       inputBytePage.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataType.BYTE, 0, 0));
       for (int i = 0; i < inputByteData.length; i++) {
         inputBytePage.putData(i, inputByteData[i]);
@@ -125,7 +129,9 @@ public class RLECodecSuite {
 
   private void testBytePageDecode(byte[] inputBytes, byte[] expectedDecodedBytes) throws IOException, MemoryException {
     RLECodec codec = new RLECodec();
-    RLEEncoderMeta meta = new RLEEncoderMeta(DataType.BYTE, expectedDecodedBytes.length, null);
+    RLEEncoderMeta meta = new RLEEncoderMeta(
+        new TableSpec.ColumnSpec("test", DataType.BYTE, ColumnType.MEASURE),
+        DataType.BYTE, expectedDecodedBytes.length, null);
     ColumnPageDecoder decoder = codec.createDecoder(meta);
     ColumnPage page = decoder.decode(inputBytes, 0, inputBytes.length);
     byte[] decoded = page.getBytePage();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 3e1b63b..35b45ca 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -19,13 +19,12 @@ package org.apache.carbondata.core.util;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveEncoderMeta;
 import org.apache.carbondata.core.datastore.page.key.TablePageKey;
 import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -124,7 +123,7 @@ public class CarbonMetadataUtilTest {
     meta.setDecimal(5);
     meta.setMinValue(objMinArr);
     meta.setMaxValue(objMaxArr);
-    meta.setType(AdaptiveEncoderMeta.DOUBLE_MEASURE);
+    meta.setType(ColumnPageEncoderMeta.DOUBLE_MEASURE);
 
     List<Encoding> encoders = new ArrayList<>();
     encoders.add(Encoding.INVERTED_INDEX);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv
index 3061ec7..a63fa65 100644
--- a/examples/spark2/src/main/resources/data.csv
+++ b/examples/spark2/src/main/resources/data.csv
@@ -1,3 +1,4 @@
+shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData
 1,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world'
 5,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world'
 1,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world'

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 3b2094a..c0429b5 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -78,8 +78,7 @@ object CarbonSessionExample {
       s"""
          | LOAD DATA LOCAL INPATH '$path'
          | INTO TABLE carbon_table
-         | OPTIONS('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData',
-         | 'COMPLEX_DELIMITER_LEVEL_1'='#')
+         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
        """.stripMargin)
     // scalastyle:on