You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/23 07:24:07 UTC

[5/7] carbondata git commit: [CARBONDATA-1371] Support creating decoder based on encoding metadata

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
index b122615..79c8101 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
@@ -17,11 +17,32 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
 import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralEncoderMeta;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralEncoderMeta;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressorEncoderMeta;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
 import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.Encoding;
+
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS;
+import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL;
 
 /**
  * Base class for encoding strategy implementation.
@@ -29,87 +50,69 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 public abstract class EncodingStrategy {
 
   /**
-   * create codec based on the page data type and statistics
+   * Return new encoder for specified column
+   */
+  public abstract ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec,
+      ColumnPage inputPage);
+
+  /**
+   * Return new decoder based on encoder metadata read from file
+   */
+  public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas)
+      throws IOException {
+    assert (encodings.size() == 1);
+    assert (encoderMetas.size() == 1);
+    Encoding encoding = encodings.get(0);
+    byte[] encoderMeta = encoderMetas.get(0).array();
+    ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta);
+    DataInputStream in = new DataInputStream(stream);
+    if (encoding == DIRECT_COMPRESS) {
+      DirectCompressorEncoderMeta metadata = new DirectCompressorEncoderMeta();
+      metadata.readFields(in);
+      return new DirectCompressCodec(metadata.getDataType()).createDecoder(metadata);
+    } else if (encoding == ADAPTIVE_INTEGRAL) {
+      AdaptiveIntegralEncoderMeta metadata = new AdaptiveIntegralEncoderMeta();
+      metadata.readFields(in);
+      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+      return new AdaptiveIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(),
+          stats).createDecoder(metadata);
+    } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
+      AdaptiveDeltaIntegralEncoderMeta metadata = new AdaptiveDeltaIntegralEncoderMeta();
+      metadata.readFields(in);
+      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+      return new AdaptiveDeltaIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(),
+          stats).createDecoder(metadata);
+    } else if (encoding == RLE_INTEGRAL) {
+      RLEEncoderMeta metadata = new RLEEncoderMeta();
+      metadata.readFields(in);
+      return new RLECodec().createDecoder(metadata);
+    } else {
+      // for backward compatibility
+      ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
+      return createDecoderLegacy(metadata);
+    }
+  }
+
+  /**
+   * Old way of creating decoder, based on algorithm
    */
-  public ColumnPageCodec newCodec(SimpleStatsResult stats) {
-    switch (stats.getDataType()) {
+  public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
+    SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+    switch (metadata.getType()) {
       case BYTE:
       case SHORT:
       case INT:
       case LONG:
-        return newCodecForIntegralType(stats);
+        return DefaultEncodingStrategy.selectCodecByAlgorithm(stats).createDecoder(null);
       case FLOAT:
       case DOUBLE:
-        return newCodecForFloatingType(stats);
       case DECIMAL:
-        return newCodecForDecimalType(stats);
       case BYTE_ARRAY:
         // no dictionary dimension
-        return newCodecForByteArrayType(stats);
+        return new DirectCompressCodec(stats.getDataType()).createDecoder(
+            new DirectCompressorEncoderMeta("snappy", stats.getDataType(), stats));
       default:
         throw new RuntimeException("unsupported data type: " + stats.getDataType());
     }
   }
-
-  /**
-   * create codec based on the page data type and statistics contained by ValueEncoderMeta
-   */
-  public ColumnPageCodec newCodec(ValueEncoderMeta meta) {
-    if (meta instanceof ColumnPageCodecMeta) {
-      ColumnPageCodecMeta codecMeta = (ColumnPageCodecMeta) meta;
-      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(codecMeta);
-      switch (codecMeta.getSrcDataType()) {
-        case BYTE:
-        case SHORT:
-        case INT:
-        case LONG:
-          return newCodecForIntegralType(stats);
-        case FLOAT:
-        case DOUBLE:
-          return newCodecForFloatingType(stats);
-        case DECIMAL:
-          return newCodecForDecimalType(stats);
-        case BYTE_ARRAY:
-          // no dictionary dimension
-          return newCodecForByteArrayType(stats);
-        default:
-          throw new RuntimeException("unsupported data type: " + stats.getDataType());
-      }
-    } else {
-      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(meta);
-      switch (meta.getType()) {
-        case BYTE:
-        case SHORT:
-        case INT:
-        case LONG:
-          return newCodecForIntegralType(stats);
-        case FLOAT:
-        case DOUBLE:
-          return newCodecForFloatingType(stats);
-        case DECIMAL:
-          return newCodecForDecimalType(stats);
-        case BYTE_ARRAY:
-          // no dictionary dimension
-          return newCodecForByteArrayType(stats);
-        default:
-          throw new RuntimeException("unsupported data type: " + stats.getDataType());
-      }
-    }
-  }
-
-  // for byte, short, int, long
-  abstract ColumnPageCodec newCodecForIntegralType(SimpleStatsResult stats);
-
-  // for float, double
-  abstract ColumnPageCodec newCodecForFloatingType(SimpleStatsResult stats);
-
-  // for decimal
-  abstract ColumnPageCodec newCodecForDecimalType(SimpleStatsResult stats);
-
-  // for byte array
-  abstract ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats);
-
-  // for dimension column
-  public abstract ColumnPageCodec newCodec(TableSpec.DimensionSpec dimensionSpec);
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
new file mode 100644
index 0000000..56527cb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+/**
+ * Factory to create Encoding Strategy.
+ * Now only a default strategy is supported which will choose encoding based on data type
+ * and column data stats.
+ */
+public class EncodingStrategyFactory {
+
+  private static EncodingStrategy defaultStrategy = new DefaultEncodingStrategy();
+
+  public static EncodingStrategy getStrategy() {
+    // TODO: make it configurable after added new strategy
+    return defaultStrategy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java
deleted file mode 100644
index c1620c6..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.DimensionType;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.util.ByteUtil;
-
-public class HighCardDictDimensionIndexCodec  extends IndexStorageCodec {
-
-  HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
-    super(isSort, isInvertedIndex, compressor);
-  }
-
-  @Override
-  public String getName() {
-    return "HighCardDictDimensionIndexCodec";
-  }
-
-  @Override
-  public EncodedColumnPage encode(ColumnPage input) throws MemoryException {
-    IndexStorage indexStorage;
-    byte[][] data = input.getByteArrayPage();
-    if (isInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        indexStorage = new BlockIndexerStorageForShort(data, false, true, isSort);
-      } else {
-        indexStorage = new BlockIndexerStorageForInt(data, false, true, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
-      } else {
-        indexStorage = new BlockIndexerStorageForNoInvertedIndexForInt(data);
-      }
-    }
-    byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
-    byte[] compressed = compressor.compressByte(flattened);
-    return new EncodedDimensionPage(input.getPageSize(), compressed, indexStorage,
-        DimensionType.PLAIN_VALUE);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java
deleted file mode 100644
index 3122b15..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-public abstract class IndexStorageCodec implements ColumnPageCodec {
-  protected ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
-  protected Compressor compressor;
-  protected boolean isSort;
-  protected boolean isInvertedIndex;
-
-  IndexStorageCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
-    this.isSort = isSort;
-    this.isInvertedIndex = isInvertedIndex;
-    this.compressor = compressor;
-  }
-
-  @Override
-  public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
-    throw new UnsupportedOperationException("internal error");
-  }
-
-  @Override
-  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
-    throw new UnsupportedOperationException("internal error");
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java
deleted file mode 100644
index dda89e0..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.CodecMetaFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * RLE encoding implementation for integral column page.
- * This encoding keeps track of repeated-run and non-repeated-run, and make use
- * of the highest bit of the length field to indicate the type of run.
- * The length field is encoded as 16 bits value. (Page size must be less than 65535 rows)
- *
- * For example: input data {5, 5, 1, 2, 3, 3, 3, 3, 3} will be encoded to
- * {0x00, 0x02, 0x05,             (repeated-run, 2 values of 5)
- *  0x80, 0x03, 0x01, 0x02, 0x03, (non-repeated-run, 3 values: 1, 2, 3)
- *  0x00, 0x04, 0x03}             (repeated-run, 4 values of 3)
- */
-public class RLECodec implements ColumnPageCodec {
-
-  enum RUN_STATE { INIT, START, REPEATED_RUN, NONREPEATED_RUN }
-
-  private DataType dataType;
-  private int pageSize;
-
-  /**
-   * New RLECodec
-   * @param dataType data type of the raw column page before encode
-   * @param pageSize page size of the raw column page before encode
-   */
-  RLECodec(DataType dataType, int pageSize) {
-    this.dataType = dataType;
-    this.pageSize = pageSize;
-  }
-
-  @Override
-  public String getName() {
-    return "RLECodec";
-  }
-
-  @Override
-  public EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException {
-    Encoder encoder = new Encoder();
-    return encoder.encode(input);
-  }
-
-  @Override
-  public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
-    throw new UnsupportedOperationException("complex column does not support RLE encoding");
-  }
-
-  @Override
-  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException,
-      IOException {
-    Decoder decoder = new Decoder(dataType, pageSize);
-    return decoder.decode(input, offset, length);
-  }
-
-  // This codec supports integral type only
-  private void validateDataType(DataType dataType) {
-    switch (dataType) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        break;
-      default:
-        throw new UnsupportedOperationException(dataType + " is not supported for RLE");
-    }
-  }
-
-  private class Encoder {
-    // While encoding RLE, this class internally work as a state machine
-    // INIT state is the initial state before any value comes
-    // START state is the start for each run
-    // REPEATED_RUN state means it is collecting repeated values (`lastValue`)
-    // NONREPEATED_RUN state means it is collecting non-repeated values (`nonRepeatValues`)
-    private RUN_STATE runState;
-
-    // count for each run, either REPEATED_RUN or NONREPEATED_RUN
-    private short valueCount;
-
-    // collected value for REPEATED_RUN
-    private Object lastValue;
-
-    // collected value for NONREPEATED_RUN
-    private List<Object> nonRepeatValues;
-
-    // data type of input page
-    private DataType dataType;
-
-    // output stream for encoded data
-    private ByteArrayOutputStream bao;
-    private DataOutputStream stream;
-
-    private Encoder() {
-      this.runState = RUN_STATE.INIT;
-      this.valueCount = 0;
-      this.nonRepeatValues = new ArrayList<>();
-      this.bao = new ByteArrayOutputStream();
-      this.stream = new DataOutputStream(bao);
-    }
-
-    private EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException {
-      validateDataType(input.getDataType());
-      this.dataType = input.getDataType();
-      switch (dataType) {
-        case BYTE:
-          byte[] bytePage = input.getBytePage();
-          for (int i = 0; i < bytePage.length; i++) {
-            putValue(bytePage[i]);
-          }
-          break;
-        case SHORT:
-          short[] shortPage = input.getShortPage();
-          for (int i = 0; i < shortPage.length; i++) {
-            putValue(shortPage[i]);
-          }
-          break;
-        case INT:
-          int[] intPage = input.getIntPage();
-          for (int i = 0; i < intPage.length; i++) {
-            putValue(intPage[i]);
-          }
-          break;
-        case LONG:
-          long[] longPage = input.getLongPage();
-          for (int i = 0; i < longPage.length; i++) {
-            putValue(longPage[i]);
-          }
-          break;
-        default:
-          throw new UnsupportedOperationException(input.getDataType() +
-              " does not support RLE encoding");
-      }
-      byte[] encoded = collectResult();
-      SimpleStatsResult stats = (SimpleStatsResult) input.getStatistics();
-      return new EncodedMeasurePage(
-          input.getPageSize(),
-          encoded,
-          CodecMetaFactory.createMeta(stats, input.getDataType()),
-          stats.getNullBits());
-    }
-
-    private void putValue(Object value) throws IOException {
-      if (runState == RUN_STATE.INIT) {
-        startNewRun(value);
-      } else {
-        if (lastValue.equals(value)) {
-          putRepeatValue(value);
-        } else {
-          putNonRepeatValue(value);
-        }
-      }
-    }
-
-    // when last row is reached, write out all collected data
-    private byte[] collectResult() throws IOException {
-      switch (runState) {
-        case REPEATED_RUN:
-          writeRunLength(valueCount);
-          writeRunValue(lastValue);
-          break;
-        case NONREPEATED_RUN:
-          writeRunLength(valueCount | 0x8000);
-          for (int i = 0; i < valueCount; i++) {
-            writeRunValue(nonRepeatValues.get(i));
-          }
-          break;
-        default:
-          assert (runState == RUN_STATE.START);
-          writeRunLength(1);
-          writeRunValue(lastValue);
-      }
-      return bao.toByteArray();
-    }
-
-    private void writeRunLength(int length) throws IOException {
-      stream.writeShort(length);
-    }
-
-    private void writeRunValue(Object value) throws IOException {
-      switch (dataType) {
-        case BYTE:
-          stream.writeByte((byte) value);
-          break;
-        case SHORT:
-          stream.writeShort((short) value);
-          break;
-        case INT:
-          stream.writeInt((int) value);
-          break;
-        case LONG:
-          stream.writeLong((long) value);
-          break;
-        default:
-          throw new RuntimeException("internal error");
-      }
-    }
-
-    // for each run, call this to initialize the state and clear the collected data
-    private void startNewRun(Object value) {
-      runState = RUN_STATE.START;
-      valueCount = 1;
-      lastValue = value;
-      nonRepeatValues.clear();
-      nonRepeatValues.add(value);
-    }
-
-    // non-repeated run ends, put the collected data to result page
-    private void encodeNonRepeatedRun() throws IOException {
-      // put the value count (highest bit is 1) and all collected values
-      writeRunLength(valueCount | 0x8000);
-      for (int i = 0; i < valueCount; i++) {
-        writeRunValue(nonRepeatValues.get(i));
-      }
-    }
-
-    // repeated run ends, put repeated value to result page
-    private void encodeRepeatedRun() throws IOException {
-      // put the value count (highest bit is 0) and repeated value
-      writeRunLength(valueCount);
-      writeRunValue(lastValue);
-    }
-
-    private void putRepeatValue(Object value) throws IOException {
-      switch (runState) {
-        case REPEATED_RUN:
-          valueCount++;
-          break;
-        case NONREPEATED_RUN:
-          // non-repeated run ends, encode this run
-          encodeNonRepeatedRun();
-          startNewRun(value);
-          break;
-        default:
-          assert (runState == RUN_STATE.START);
-          // enter repeated run
-          runState = RUN_STATE.REPEATED_RUN;
-          valueCount++;
-          break;
-      }
-    }
-
-    private void putNonRepeatValue(Object value) throws IOException {
-      switch (runState) {
-        case NONREPEATED_RUN:
-          // collect the non-repeated value
-          nonRepeatValues.add(value);
-          lastValue = value;
-          valueCount++;
-          break;
-        case REPEATED_RUN:
-          // repeated-run ends, encode this run
-          encodeRepeatedRun();
-          startNewRun(value);
-          break;
-        default:
-          assert (runState == RUN_STATE.START);
-          // enter non-repeated run
-          runState = RUN_STATE.NONREPEATED_RUN;
-          nonRepeatValues.add(value);
-          lastValue = value;
-          valueCount++;
-          break;
-      }
-    }
-
-  }
-
-  // It decodes data in one shot. It is suitable for scan query
-  // TODO: add a on-the-fly decoder for filter query with high selectivity
-  private class Decoder {
-
-    // src data type
-    private DataType dataType;
-    private int pageSize;
-
-    private Decoder(DataType dataType, int pageSize) throws MemoryException {
-      validateDataType(dataType);
-      this.dataType = dataType;
-      this.pageSize = pageSize;
-    }
-
-    private ColumnPage decode(byte[] input, int offset, int length)
-        throws MemoryException, IOException {
-      DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
-      ColumnPage resultPage = ColumnPage.newPage(dataType, pageSize);
-      switch (dataType) {
-        case BYTE:
-          decodeBytePage(in, resultPage);
-          break;
-        case SHORT:
-          decodeShortPage(in, resultPage);
-          break;
-        case INT:
-          decodeIntPage(in, resultPage);
-          break;
-        case LONG:
-          decodeLongPage(in, resultPage);
-          break;
-        default:
-          throw new RuntimeException("unsupported datatype:" + dataType);
-      }
-      return resultPage;
-    }
-
-    private void decodeBytePage(DataInputStream in, ColumnPage decodedPage)
-        throws IOException {
-      int rowId = 0;
-      do {
-        int runLength = in.readShort();
-        int count = runLength & 0x7FFF;
-        if (runLength < 0) {
-          // non-repeated run
-          for (int i = 0; i < count; i++) {
-            decodedPage.putByte(rowId++, in.readByte());
-          }
-        } else {
-          // repeated run
-          byte value = in.readByte();
-          for (int i = 0; i < count; i++) {
-            decodedPage.putByte(rowId++, value);
-          }
-        }
-      } while (in.available() > 0);
-    }
-
-    private void decodeShortPage(DataInputStream in, ColumnPage decodedPage)
-        throws IOException {
-      int rowId = 0;
-      do {
-        int runLength = in.readShort();
-        int count = runLength & 0x7FFF;
-        if (runLength < 0) {
-          // non-repeated run
-          for (int i = 0; i < count; i++) {
-            decodedPage.putShort(rowId++, in.readShort());
-          }
-        } else {
-          // repeated run
-          short value = in.readShort();
-          for (int i = 0; i < count; i++) {
-            decodedPage.putShort(rowId++, value);
-          }
-        }
-      } while (in.available() > 0);
-    }
-
-    private void decodeIntPage(DataInputStream in, ColumnPage decodedPage)
-        throws IOException {
-      int rowId = 0;
-      do {
-        int runLength = in.readShort();
-        int count = runLength & 0x7FFF;
-        if (runLength < 0) {
-          // non-repeated run
-          for (int i = 0; i < count; i++) {
-            decodedPage.putInt(rowId++, in.readInt());
-          }
-        } else {
-          // repeated run
-          int value = in.readInt();
-          for (int i = 0; i < count; i++) {
-            decodedPage.putInt(rowId++, value);
-          }
-        }
-      } while (in.available() > 0);
-    }
-
-    private void decodeLongPage(DataInputStream in, ColumnPage decodedPage)
-        throws IOException {
-      int rowId = 0;
-      do {
-        int runLength = in.readShort();
-        int count = runLength & 0x7FFF;
-        if (runLength < 0) {
-          // non-repeated run
-          for (int i = 0; i < count; i++) {
-            decodedPage.putLong(rowId++, in.readLong());
-          }
-        } else {
-          // repeated run
-          long value = in.readLong();
-          for (int i = 0; i < count; i++) {
-            decodedPage.putLong(rowId++, value);
-          }
-        }
-      } while (in.available() > 0);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
new file mode 100644
index 0000000..94ca3e6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Subclass of this codec depends on statistics of the column page (adaptive) to perform apply
+ * and decode, it also employs compressor to compress the encoded data
+ */
+public abstract class AdaptiveCodec implements ColumnPageCodec {
+
+  // TODO: cache and reuse the same encoder since snappy is thread-safe
+
+  // statistics of this page, can be used by subclass
+  protected final SimpleStatsResult stats;
+
+  // the data type used for storage
+  protected final DataType targetDataType;
+
+  // the data type specified in schema
+  protected final DataType srcDataType;
+
+  protected AdaptiveCodec(DataType srcDataType, DataType targetDataType,
+      SimpleStatsResult stats) {
+    this.stats = stats;
+    this.srcDataType = srcDataType;
+    this.targetDataType = targetDataType;
+  }
+
+  public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s[src type: %s, target type: %s, stats(%s)]",
+        getClass().getName(), srcDataType, targetDataType, stats);
+  }
+
+  protected String debugInfo() {
+    return this.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
new file mode 100644
index 0000000..9107a6b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This codec will calculate delta of page max value and page value,
+ * and do type casting of the diff to make storage minimum.
+ */
+public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
+
+  private ColumnPage encodedPage;
+  private long max;
+
+  public AdaptiveDeltaIntegralCodec(DataType srcDataType, DataType targetDataType,
+      SimpleStatsResult stats) {
+    super(srcDataType, targetDataType, stats);
+    switch (srcDataType) {
+      case BYTE:
+        this.max = (byte) stats.getMax();
+        break;
+      case SHORT:
+        this.max = (short) stats.getMax();
+        break;
+      case INT:
+        this.max = (int) stats.getMax();
+        break;
+      case LONG:
+        this.max = (long) stats.getMax();
+        break;
+      default:
+        // this codec is for integer type only
+        throw new UnsupportedOperationException(
+            "unsupported data type for Delta compress: " + srcDataType);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "DeltaIntegralCodec";
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+    return new ColumnPageEncoder() {
+      final Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+      @Override
+      protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+        if (encodedPage != null) {
+          throw new IllegalStateException("already encoded");
+        }
+        encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+        input.convertValue(converter);
+        byte[] result = encodedPage.compress(compressor);
+        encodedPage.freeMemory();
+        return result;
+      }
+
+      @Override
+      protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+        return new AdaptiveDeltaIntegralEncoderMeta(
+            compressor.getName(), targetDataType, inputPage.getStatistics());
+      }
+
+      @Override
+      protected List<Encoding> getEncodingList() {
+        List<Encoding> encodings = new ArrayList<>();
+        encodings.add(Encoding.ADAPTIVE_DELTA_INTEGRAL);
+        return encodings;
+      }
+
+    };
+  }
+
+  @Override
+  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
+    AdaptiveDeltaIntegralEncoderMeta codecMeta = (AdaptiveDeltaIntegralEncoderMeta) meta;
+    final Compressor compressor = CompressorFactory.getInstance().getCompressor(
+        codecMeta.getCompressorName());
+    return new ColumnPageDecoder() {
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length)
+          throws MemoryException, IOException {
+        ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+        DeltaIntegralConverter converter = new DeltaIntegralConverter(page, targetDataType,
+            srcDataType, stats.getMax());
+        return LazyColumnPage.newPage(page, converter);
+      }
+    };
+  }
+
+  private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
+    @Override
+    public void encode(int rowId, byte value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error");
+      }
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error");
+      }
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error");
+      }
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, max - value);
+          break;
+        default:
+          throw new RuntimeException("internal error");
+      }
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error");
+      }
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error");
+      }
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      return max - value;
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      return max - value;
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      // this codec is for integer type only
+      throw new RuntimeException("internal error");
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      // this codec is for integer type only
+      throw new RuntimeException("internal error");
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
new file mode 100644
index 0000000..1d20481
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+public class AdaptiveDeltaIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable {
+
+  public AdaptiveDeltaIntegralEncoderMeta() {
+  }
+
+  AdaptiveDeltaIntegralEncoderMeta(String compressorName, DataType targetDataType,
+      SimpleStatsResult stats) {
+    super(targetDataType, stats, compressorName);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
new file mode 100644
index 0000000..3104dd6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+/**
+ * Metadata for AdaptiveIntegralCodec and DeltaIntegralCodec
+ */
+public class AdaptiveEncoderMeta extends ColumnPageEncoderMeta implements Writable {
+
+  private DataType targetDataType;
+  private String compressorName;
+
+  AdaptiveEncoderMeta() {
+
+  }
+
+  AdaptiveEncoderMeta(DataType targetDataType, SimpleStatsResult stats,
+      String compressorName) {
+    super(stats.getDataType(), stats);
+    this.targetDataType = targetDataType;
+    this.compressorName = compressorName;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeByte(targetDataType.ordinal());
+    out.writeUTF(compressorName);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.targetDataType = DataType.valueOf(in.readByte());
+    this.compressorName = in.readUTF();
+  }
+
+  public DataType getTargetDataType() {
+    return targetDataType;
+  }
+
+  public String getCompressorName() {
+    return compressorName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
new file mode 100644
index 0000000..ccd92d1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This converter will do type casting on page data to make storage minimum.
+ */
+public class AdaptiveIntegralCodec extends AdaptiveCodec {
+
+  private ColumnPage encodedPage;
+
+  public AdaptiveIntegralCodec(DataType srcDataType, DataType targetDataType,
+      SimpleStatsResult stats) {
+    super(srcDataType, targetDataType, stats);
+  }
+
+  @Override
+  public String getName() {
+    return "AdaptiveIntegralCodec";
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+    final Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    return new ColumnPageEncoder() {
+      @Override
+      protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+        if (encodedPage != null) {
+          throw new IllegalStateException("already encoded");
+        }
+        encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+        input.convertValue(converter);
+        byte[] result = encodedPage.compress(compressor);
+        encodedPage.freeMemory();
+        return result;
+      }
+
+      @Override
+      protected List<Encoding> getEncodingList() {
+        List<Encoding> encodings = new ArrayList<Encoding>();
+        encodings.add(Encoding.ADAPTIVE_INTEGRAL);
+        return encodings;
+      }
+
+      @Override
+      protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+        return new AdaptiveIntegralEncoderMeta(targetDataType, stats, compressor.getName());
+      }
+
+    };
+  }
+
+  @Override
+  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
+    AdaptiveIntegralEncoderMeta codecMeta = (AdaptiveIntegralEncoderMeta) meta;
+    final Compressor compressor = CompressorFactory.getInstance().getCompressor(
+        codecMeta.getCompressorName());
+    final DataType targetDataType = codecMeta.getTargetDataType();
+    return new ColumnPageDecoder() {
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length)
+          throws MemoryException, IOException {
+        ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+        return LazyColumnPage.newPage(page, converter);
+      }
+    };
+  }
+
+  // encoded value = (type cast page value to target data type)
+  private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
+    @Override
+    public void encode(int rowId, byte value) {
+      switch (targetDataType) {
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int) value);
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int) value);
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int) value);
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int) value);
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+  };
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
new file mode 100644
index 0000000..3025303
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+public class AdaptiveIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable {
+
+  public AdaptiveIntegralEncoderMeta() {
+  }
+
+  public AdaptiveIntegralEncoderMeta(DataType targetDataType, SimpleStatsResult stats,
+      String compressorName) {
+    super(targetDataType, stats, compressorName);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java
new file mode 100644
index 0000000..8a2bf6d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+public class DeltaIntegralConverter implements ColumnPageValueConverter {
+  private DataType targetDataType;
+  private ColumnPage encodedPage;
+  private long max;
+
+  public DeltaIntegralConverter(ColumnPage encodedPage, DataType targetDataType,
+      DataType srcDataType, Object max) {
+    this.targetDataType = targetDataType;
+    this.encodedPage = encodedPage;
+    switch (srcDataType) {
+      case BYTE:
+        this.max = (byte) max;
+        break;
+      case SHORT:
+        this.max = (short) max;
+        break;
+      case INT:
+        this.max = (int) max;
+        break;
+      case LONG:
+        this.max = (long) max;
+        break;
+      case FLOAT:
+      case DOUBLE:
+        this.max = (long)(max);
+        break;
+    }
+  }
+
+  @Override
+  public void encode(int rowId, byte value) {
+    switch (targetDataType) {
+      case BYTE:
+        encodedPage.putByte(rowId, (byte)(max - value));
+        break;
+      default:
+        throw new RuntimeException("internal error");
+    }
+  }
+
+  @Override
+  public void encode(int rowId, short value) {
+    switch (targetDataType) {
+      case BYTE:
+        encodedPage.putByte(rowId, (byte)(max - value));
+        break;
+      case SHORT:
+        encodedPage.putShort(rowId, (short)(max - value));
+        break;
+      default:
+        throw new RuntimeException("internal error");
+    }
+  }
+
+  @Override
+  public void encode(int rowId, int value) {
+    switch (targetDataType) {
+      case BYTE:
+        encodedPage.putByte(rowId, (byte)(max - value));
+        break;
+      case SHORT:
+        encodedPage.putShort(rowId, (short)(max - value));
+        break;
+      case SHORT_INT:
+        encodedPage.putShortInt(rowId, (int)(max - value));
+        break;
+      case INT:
+        encodedPage.putInt(rowId, (int)(max - value));
+        break;
+      default:
+        throw new RuntimeException("internal error");
+    }
+  }
+
+  @Override
+  public void encode(int rowId, long value) {
+    switch (targetDataType) {
+      case BYTE:
+        encodedPage.putByte(rowId, (byte)(max - value));
+        break;
+      case SHORT:
+        encodedPage.putShort(rowId, (short)(max - value));
+        break;
+      case SHORT_INT:
+        encodedPage.putShortInt(rowId, (int)(max - value));
+        break;
+      case INT:
+        encodedPage.putInt(rowId, (int)(max - value));
+        break;
+      case LONG:
+        encodedPage.putLong(rowId, max - value);
+        break;
+      default:
+        throw new RuntimeException("internal error");
+    }
+  }
+
+  @Override
+  public void encode(int rowId, float value) {
+    switch (targetDataType) {
+      case BYTE:
+        encodedPage.putByte(rowId, (byte)(max - value));
+        break;
+      case SHORT:
+        encodedPage.putShort(rowId, (short)(max - value));
+        break;
+      case SHORT_INT:
+        encodedPage.putShortInt(rowId, (int)(max - value));
+        break;
+      case INT:
+        encodedPage.putInt(rowId, (int)(max - value));
+        break;
+      case LONG:
+        encodedPage.putLong(rowId, (long)(max - value));
+        break;
+      default:
+        throw new RuntimeException("internal error");
+    }
+  }
+
+  @Override
+  public void encode(int rowId, double value) {
+    switch (targetDataType) {
+      case BYTE:
+        encodedPage.putByte(rowId, (byte)(max - value));
+        break;
+      case SHORT:
+        encodedPage.putShort(rowId, (short)(max - value));
+        break;
+      case SHORT_INT:
+        encodedPage.putShortInt(rowId, (int)(max - value));
+        break;
+      case INT:
+        encodedPage.putInt(rowId, (int)(max - value));
+        break;
+      case LONG:
+        encodedPage.putLong(rowId, (long)(max - value));
+        break;
+      default:
+        throw new RuntimeException("internal error");
+    }
+  }
+
+  @Override
+  public long decodeLong(byte value) {
+    return max - value;
+  }
+
+  @Override
+  public long decodeLong(short value) {
+    return max - value;
+  }
+
+  @Override
+  public long decodeLong(int value) {
+    return max - value;
+  }
+
+  @Override
+  public double decodeDouble(byte value) {
+    return max - value;
+  }
+
+  @Override
+  public double decodeDouble(short value) {
+    return max - value;
+  }
+
+  @Override
+  public double decodeDouble(int value) {
+    return max - value;
+  }
+
+  @Override
+  public double decodeDouble(long value) {
+    return max - value;
+  }
+
+  @Override
+  public double decodeDouble(float value) {
+    // this codec is for integer type only
+    throw new RuntimeException("internal error");
+  }
+
+  @Override
+  public double decodeDouble(double value) {
+    // this codec is for integer type only
+    throw new RuntimeException("internal error");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
new file mode 100644
index 0000000..1fc61ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.compress;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * This codec directly apply compression on the input data
+ */
+public class DirectCompressCodec implements ColumnPageCodec {
+
+  private DataType dataType;
+
+  public DirectCompressCodec(DataType dataType) {
+    this.dataType = dataType;
+  }
+
+  @Override
+  public String getName() {
+    return "DirectCompressCodec";
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+    // TODO: make compressor configurable in create table
+    return new DirectCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR);
+  }
+
+  @Override
+  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
+    DirectCompressorEncoderMeta codecMeta = (DirectCompressorEncoderMeta) meta;
+    return new DirectDecompressor(codecMeta.getCompressorName(),
+        codecMeta.getScale(), codecMeta.getPrecision());
+  }
+
+  private class DirectCompressor extends ColumnPageEncoder {
+
+    private Compressor compressor;
+
+    DirectCompressor(String compressorName) {
+      this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
+    }
+
+    @Override
+    protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+      return input.compress(compressor);
+    }
+
+    @Override
+    protected List<Encoding> getEncodingList() {
+      List<Encoding> encodings = new ArrayList<>();
+      encodings.add(Encoding.DIRECT_COMPRESS);
+      return encodings;
+    }
+
+    @Override
+    protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+      return new DirectCompressorEncoderMeta(compressor.getName(), inputPage.getDataType(),
+          inputPage.getStatistics());
+    }
+
+  }
+
+  private class DirectDecompressor implements ColumnPageDecoder {
+
+    private Compressor compressor;
+    private int scale;
+    private int precision;
+
+    DirectDecompressor(String compressorName, int scale, int precision) {
+      this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
+      this.scale = scale;
+      this.precision = precision;
+    }
+
+    @Override
+    public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+      ColumnPage decodedPage;
+      if (dataType == DataType.DECIMAL) {
+        decodedPage = ColumnPage.decompressDecimalPage(compressor, input, offset, length,
+            scale, precision);
+      } else {
+        decodedPage = ColumnPage.decompress(compressor, dataType, input, offset, length);
+      }
+      return LazyColumnPage.newPage(decodedPage, converter);
+    }
+  }
+
+  private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
+    @Override
+    public void encode(int rowId, byte value) {
+      throw new RuntimeException("internal error");
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      throw new RuntimeException("internal error");
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      throw new RuntimeException("internal error");
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      throw new RuntimeException("internal error");
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      throw new RuntimeException("internal error");
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      throw new RuntimeException("internal error");
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      return value;
+    }
+  };
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
new file mode 100644
index 0000000..cf19259
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+public class DirectCompressorEncoderMeta extends ColumnPageEncoderMeta implements Writable {
+  private String compressorName;
+
+  public DirectCompressorEncoderMeta() {
+  }
+
+  public DirectCompressorEncoderMeta(String compressorName, final DataType dataType,
+      SimpleStatsResult stats) {
+    super(dataType, stats);
+    this.compressorName = compressorName;
+  }
+
+  public String getCompressorName() {
+    return compressorName;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeUTF(compressorName);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    compressorName = in.readUTF();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
new file mode 100644
index 0000000..e37b8f6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.format.Encoding;
+
+public class ComplexDimensionIndexCodec extends IndexStorageCodec {
+
+  public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
+      Compressor compressor) {
+    super(isSort, isInvertedIndex, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "ComplexDimensionIndexCodec";
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+    return new IndexStorageEncoder() {
+      @Override
+      void encodeIndexStorage(ColumnPage inputPage) {
+        IndexStorage indexStorage =
+            new BlockIndexerStorageForShort(inputPage.getByteArrayPage(), false, false, false);
+        byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        byte[] compressed = compressor.compressByte(flattened);
+        super.indexStorage = indexStorage;
+        super.compressedDataPage = compressed;
+      }
+
+      @Override
+      protected List<Encoding> getEncodingList() {
+        List<Encoding> encodings = new ArrayList<>();
+        encodings.add(Encoding.DICTIONARY);
+        encodings.add(Encoding.INVERTED_INDEX);
+        return encodings;
+      }
+
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
new file mode 100644
index 0000000..d157654
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.format.Encoding;
+
+public class DictDimensionIndexCodec extends IndexStorageCodec {
+
+  public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+    super(isSort, isInvertedIndex, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "DictDimensionIndexCodec";
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+    return new IndexStorageEncoder() {
+      @Override
+      void encodeIndexStorage(ColumnPage inputPage) {
+        IndexStorage indexStorage;
+        byte[][] data = inputPage.getByteArrayPage();
+        if (isInvertedIndex) {
+          indexStorage = new BlockIndexerStorageForShort(data, true, false, isSort);
+        } else {
+          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+        }
+        byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        super.compressedDataPage = compressor.compressByte(flattened);
+        super.indexStorage = indexStorage;
+      }
+
+      @Override
+      protected List<Encoding> getEncodingList() {
+        List<Encoding> encodings = new ArrayList<>();
+        encodings.add(Encoding.DICTIONARY);
+        encodings.add(Encoding.RLE);
+        if (isInvertedIndex) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
+        return encodings;
+      }
+
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
new file mode 100644
index 0000000..1e5015b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.format.Encoding;
+
+public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
+
+  public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
+      Compressor compressor) {
+    super(isSort, isInvertedIndex, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "DirectDictDimensionIndexCodec";
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+    return new IndexStorageEncoder() {
+      @Override
+      void encodeIndexStorage(ColumnPage inputPage) {
+        IndexStorage indexStorage;
+        byte[][] data = inputPage.getByteArrayPage();
+        if (isInvertedIndex) {
+          indexStorage = new BlockIndexerStorageForShort(data, false, false, isSort);
+        } else {
+          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+        }
+        byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        super.compressedDataPage = compressor.compressByte(flattened);
+        super.indexStorage = indexStorage;
+      }
+
+      @Override
+      protected List<Encoding> getEncodingList() {
+        List<Encoding> encodings = new ArrayList<>();
+        encodings.add(Encoding.DICTIONARY);
+        encodings.add(Encoding.RLE);
+        if (isInvertedIndex) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
+        return encodings;
+      }
+    };
+  }
+
+}