You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/20 07:29:12 UTC

[06/56] [abbrv] carbondata git commit: add EncodingStrategy

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java
new file mode 100644
index 0000000..31eb7ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+// Transformation type that can be applied to ColumnPage
+public interface PrimitiveCodec {
+  void encode(int rowId, byte value);
+  void encode(int rowId, short value);
+  void encode(int rowId, int value);
+  void encode(int rowId, long value);
+  void encode(int rowId, float value);
+  void encode(int rowId, double value);
+
+  long decodeLong(byte value);
+  long decodeLong(short value);
+  long decodeLong(int value);
+  double decodeDouble(byte value);
+  double decodeDouble(short value);
+  double decodeDouble(int value);
+  double decodeDouble(long value);
+  double decodeDouble(float value);
+  double decodeDouble(double value);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
new file mode 100644
index 0000000..c843b55
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Subclass of this codec depends on statistics of the column page (adaptive) to perform apply
+ * and decode, it also employs compressor to compress the encoded data
+ */
+public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
+
+  // TODO: cache and reuse the same encoder since snappy is thread-safe
+
+  // compressor that can be used by subclass
+  protected final Compressor compressor;
+
+  // statistics of this page, can be used by subclass
+  protected final ColumnPageStatsVO stats;
+
+  // the data type used for storage
+  protected final DataType targetDataType;
+
+  // the data type specified in schema
+  protected final DataType srcDataType;
+
+  protected AdaptiveCompressionCodec(DataType srcDataType, DataType targetDataType,
+      ColumnPageStatsVO stats, Compressor compressor) {
+    this.stats = stats;
+    this.srcDataType = srcDataType;
+    this.targetDataType = targetDataType;
+    this.compressor = compressor;
+  }
+
+  public abstract String getName();
+
+  public abstract byte[] encode(ColumnPage input);
+
+  public abstract ColumnPage decode(byte[] input, int offset, int length);
+
+  @Override
+  public String toString() {
+    return String.format("%s[src type: %s, target type: %s, stats(%s)]",
+        getClass().getName(), srcDataType, targetDataType, stats);
+  }
+
+  protected String debugInfo() {
+    return this.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
new file mode 100644
index 0000000..f768a14
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This codec will do type casting on page data to make storage minimum.
+ */
+class AdaptiveIntegerCodec extends AdaptiveCompressionCodec {
+
+  private ColumnPage encodedPage;
+
+  public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
+      ColumnPageStatsVO stats, Compressor compressor) {
+    return new AdaptiveIntegerCodec(srcDataType, targetDataType, stats, compressor);
+  }
+
+  private AdaptiveIntegerCodec(DataType srcDataType, DataType targetDataType,
+      ColumnPageStatsVO stats, Compressor compressor) {
+    super(srcDataType, targetDataType, stats, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "AdaptiveIntegerCodec";
+  }
+
+  @Override
+  public byte[] encode(ColumnPage input) {
+    if (srcDataType.equals(targetDataType)) {
+      return input.compress(compressor);
+    } else {
+      encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+      input.encode(codec);
+      return encodedPage.compress(compressor);
+    }
+  }
+
+  @Override
+  public ColumnPage decode(byte[] input, int offset, int length) {
+    if (srcDataType.equals(targetDataType)) {
+      return ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+    } else {
+      ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+      return LazyColumnPage.newPage(page, codec);
+    }
+  }
+
+  // encoded value = (type cast page value to target data type)
+  private PrimitiveCodec codec = new PrimitiveCodec() {
+    @Override
+    public void encode(int rowId, byte value) {
+      switch (targetDataType) {
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int) value);
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+  };
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
deleted file mode 100644
index e870ad6..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-
-/**
- *  Codec for a column page data, implementation should not keep state across pages,
- *  caller will use the same object to encode multiple pages.
- */
-public interface ColumnCodec {
-
-  /** Codec name will be stored in BlockletHeader (DataChunk3) */
-  String getName();
-
-  byte[] encode(ColumnPage columnPage);
-
-  ColumnPage decode(byte[] encoded);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
new file mode 100644
index 0000000..21913be
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ *  Codec for a column page data, implementation should not keep state across pages,
+ *  caller may use the same object to apply multiple pages.
+ */
+public interface ColumnPageCodec {
+
+  /**
+   * Codec name will be stored in BlockletHeader (DataChunk3)
+   */
+  String getName();
+
+  /**
+   * apply a column page and output encoded byte array
+   * @param input column page to apply
+   * @return encoded data
+   */
+  byte[] encode(ColumnPage input);
+
+  /**
+   * decode byte array from offset to a column page
+   * @param input encoded byte array
+   * @param offset startoffset of the input to decode
+   * @param length length of data to decode
+   * @return decoded data
+   */
+  ColumnPage decode(byte[] input, int offset, int length);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
new file mode 100644
index 0000000..4568503
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for variable length data type (decimal, string).
+ * This codec will flatten the variable length data before applying compression.
+ */
+public class CompressionCodec implements ColumnPageCodec {
+
+  private Compressor compressor;
+  private DataType dataType;
+
+  protected CompressionCodec(DataType dataType, Compressor compressor) {
+    this.compressor = compressor;
+    this.dataType = dataType;
+  }
+
+  public static CompressionCodec newInstance(DataType dataType, Compressor compressor) {
+    return new CompressionCodec(dataType, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "CompressionCodec";
+  }
+
+  @Override
+  public byte[] encode(ColumnPage input) {
+    return input.compress(compressor);
+  }
+
+  @Override
+  public ColumnPage decode(byte[] input, int offset, int length) {
+    return ColumnPage.decompress(compressor, dataType, input, offset, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
new file mode 100644
index 0000000..3dfcf94
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Default strategy will select encoding base on column page data type and statistics
+ */
+public class DefaultEncodingStrategy extends EncodingStrategy {
+
+  private static final Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+  // fit the long input value into minimum data type
+  public static DataType fitDataType(long value) {
+    if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
+      return DataType.BYTE;
+    } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
+      return DataType.SHORT;
+    } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+      return DataType.INT;
+    } else {
+      return DataType.LONG;
+    }
+  }
+
+  protected DataType fitDataType(long max, long min) {
+    if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
+      return DataType.BYTE;
+    } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
+      return DataType.SHORT;
+    } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
+      return DataType.INT;
+    } else {
+      return DataType.LONG;
+    }
+  }
+
+  // fit the input double value into minimum data type
+  protected DataType fitDataType(double value, int decimal) {
+    DataType dataType = DataType.DOUBLE;
+    if (decimal == 0) {
+      if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
+        dataType = DataType.BYTE;
+      } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
+        dataType = DataType.SHORT;
+      } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+        dataType = DataType.INT;
+      } else if (value <= Long.MAX_VALUE && value >= Long.MIN_VALUE) {
+        dataType = DataType.LONG;
+      }
+    }
+    return dataType;
+  }
+
+  // choose between adaptive encoder or delta adaptive encoder, based on whose target data type
+  // size is smaller
+  @Override
+  ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats) {
+    DataType adaptiveDataType = fitDataType((long)stats.getMax(), (long)stats.getMin());
+    DataType deltaDataType;
+
+    // TODO: this handling is for data compatibility, change to Override check when implementing
+    // encoding override feature
+    if (adaptiveDataType == DataType.LONG) {
+      deltaDataType = DataType.LONG;
+    } else {
+      deltaDataType = fitDataType((long) stats.getMax() - (long) stats.getMin());
+    }
+    if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
+      // choose adaptive encoding
+      return AdaptiveIntegerCodec.newInstance(
+          stats.getDataType(), adaptiveDataType, stats, compressor);
+    } else {
+      // choose delta adaptive encoding
+      return DeltaIntegerCodec.newInstance(stats.getDataType(), deltaDataType, stats, compressor);
+    }
+  }
+
+  // choose between upscale adaptive encoder or upscale delta adaptive encoder,
+  // based on whose target data type size is smaller
+  @Override
+  ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats) {
+    DataType srcDataType = stats.getDataType();
+    double maxValue = (double) stats.getMax();
+    double minValue = (double) stats.getMin();
+    int decimal = stats.getDecimal();
+
+    //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
+    //but we can't use -1 to getDatatype, we should use -10000000.
+    double absMaxValue = Math.abs(maxValue) >= Math.abs(minValue) ? maxValue : minValue;
+
+    if (decimal == 0) {
+      // short, int, long
+      DataType adaptiveDataType = fitDataType(absMaxValue, decimal);
+      DataType deltaDataType = fitDataType(maxValue - minValue, decimal);
+      if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
+        // choose adaptive encoding
+        return AdaptiveIntegerCodec.newInstance(srcDataType, adaptiveDataType, stats, compressor);
+      } else {
+        // choose delta adaptive encoding
+        return DeltaIntegerCodec.newInstance(srcDataType, deltaDataType, stats, compressor);
+      }
+    } else {
+      // double
+      DataType upscaleAdaptiveDataType = fitDataType(Math.pow(10, decimal) * absMaxValue, 0);
+      DataType upscaleDiffDataType = fitDataType(Math.pow(10, decimal) * (maxValue - minValue), 0);
+      if (upscaleAdaptiveDataType.getSizeInBytes() <= upscaleDiffDataType.getSizeInBytes()) {
+        return UpscaleFloatingCodec.newInstance(
+            srcDataType, upscaleAdaptiveDataType, stats, compressor);
+      } else {
+        return UpscaleDeltaFloatingCodec.newInstance(
+            srcDataType, upscaleDiffDataType, stats, compressor);
+      }
+    }
+  }
+
+  // for decimal, currently it is a very basic implementation
+  @Override
+  ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats) {
+    return CompressionCodec.newInstance(stats.getDataType(), compressor);
+  }
+
+  @Override
+  ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats) {
+    return CompressionCodec.newInstance(stats.getDataType(), compressor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
new file mode 100644
index 0000000..e8e7779
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This codec will calculate delta of page max value and page value,
+ * and do type casting of the diff to make storage minimum.
+ */
+public class DeltaIntegerCodec extends AdaptiveCompressionCodec {
+
+  private ColumnPage encodedPage;
+
+  private long max;
+
+  public static DeltaIntegerCodec newInstance(DataType srcDataType, DataType targetDataType,
+      ColumnPageStatsVO stats, Compressor compressor) {
+    return new DeltaIntegerCodec(srcDataType, targetDataType, stats, compressor);
+  }
+
+  private DeltaIntegerCodec(DataType srcDataType, DataType targetDataType,
+      ColumnPageStatsVO stats, Compressor compressor) {
+    super(srcDataType, targetDataType, stats, compressor);
+    switch (srcDataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        max = (long) stats.getMax();
+        break;
+      case FLOAT:
+      case DOUBLE:
+        max = (long)((double) stats.getMax());
+        break;
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "DeltaIntegerCodec";
+  }
+
+  @Override
+  public byte[] encode(ColumnPage input) {
+    encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+    input.encode(codec);
+    return encodedPage.compress(compressor);
+  }
+
+  @Override
+  public ColumnPage decode(byte[] input, int offset, int length) {
+    ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+    return LazyColumnPage.newPage(page, codec);
+  }
+
+  // encoded value = (max value of page) - (page value)
+  private PrimitiveCodec codec = new PrimitiveCodec() {
+    @Override
+    public void encode(int rowId, byte value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, max - value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      return max - value;
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      return max - value;
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      // this codec is for integer type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      // this codec is for integer type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
deleted file mode 100644
index 0dd23c7..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-
-public class DummyCodec implements ColumnCodec {
-  @Override
-  public String getName() {
-    return "DummyCodec";
-  }
-
-  @Override
-  public byte[] encode(ColumnPage columnPage) {
-    return null;
-  }
-
-  @Override
-  public ColumnPage decode(byte[] encoded) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java
new file mode 100644
index 0000000..0d1b2e4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+
+// result result of all columns
+public class EncodedData {
+  // dimension data that include rowid (index)
+  public IndexStorage[] indexStorages;
+
+  // encoded and compressed dimension data
+  public byte[][] dimensions;
+
+  // encoded and compressed measure data
+  public byte[][] measures;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
new file mode 100644
index 0000000..49fb625
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+
+/**
+ * Base class for encoding strategy implementation.
+ */
+public abstract class EncodingStrategy {
+
+  /**
+   * create codec based on the page data type and statistics
+   */
+  public ColumnPageCodec createCodec(ColumnPageStatsVO stats) {
+    switch (stats.getDataType()) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        return newCodecForIntegerType(stats);
+      case FLOAT:
+      case DOUBLE:
+        return newCodecForFloatingType(stats);
+      case DECIMAL:
+        return newCodecForDecimalType(stats);
+      case BYTE_ARRAY:
+        // no dictionary dimension
+        return newCodecForByteArrayType(stats);
+      default:
+        throw new RuntimeException("unsupported data type: " + stats.getDataType());
+    }
+  }
+
+  /**
+   * create codec based on the page data type and statistics contained by ValueEncoderMeta
+   */
+  public ColumnPageCodec createCodec(ValueEncoderMeta meta) {
+    ColumnPageStatsVO stats = ColumnPageStatsVO.copyFrom(meta);
+    return createCodec(stats);
+  }
+
+  // for byte, short, int, long
+  abstract ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats);
+
+  // for float, double
+  abstract ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats);
+
+  // for decimal
+  abstract ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats);
+
+  // for byte array
+  abstract ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
new file mode 100644
index 0000000..c58a96f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for floating point (float, double) data type page.
+ * This codec will upscale (multiple page value by decimal) to integer value,
+ * and do type casting to make storage minimum.
+ */
+public class UpscaleDeltaFloatingCodec extends AdaptiveCompressionCodec {
+
+  private ColumnPage encodedPage;
+
+  private BigDecimal max;
+  private double factor;
+
+  public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
+      ColumnPageStatsVO stats, Compressor compressor) {
+    return new UpscaleDeltaFloatingCodec(srcDataType, targetDataType, stats, compressor);
+  }
+
+  private UpscaleDeltaFloatingCodec(DataType srcDataType, DataType targetDataType,
+      ColumnPageStatsVO stats, Compressor compressor) {
+    super(srcDataType, targetDataType, stats, compressor);
+    this.max = BigDecimal.valueOf((double) stats.getMax());
+    this.factor = Math.pow(10, stats.getDecimal());
+  }
+
+  @Override
+  public String getName() {
+    return "UpscaleDeltaFloatingCodec";
+  }
+
+  @Override
+  public byte[] encode(ColumnPage input) {
+    encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+    input.encode(codec);
+    return encodedPage.compress(compressor);
+  }
+
+  @Override
+  public ColumnPage decode(byte[] input, int offset, int length) {
+    ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+    return LazyColumnPage.newPage(page, codec);
+  }
+
+  // encoded value = (10 power of decimal) * ((max value of page) - (page value))
+  private PrimitiveCodec codec = new PrimitiveCodec() {
+    @Override
+    public void encode(int rowId, byte value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      double diff = max.subtract(BigDecimal.valueOf(value)).doubleValue();
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(Math.round(factor * diff)));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(Math.round(factor * diff)));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(Math.round(factor * diff)));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(Math.round(factor * diff)));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      double diff = max.subtract(BigDecimal.valueOf(value)).doubleValue();
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(Math.round(factor * diff)));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(Math.round(factor * diff)));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(Math.round(factor * diff)));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(Math.round(factor * diff)));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue();
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue();
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue();
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue();
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+  };
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
new file mode 100644
index 0000000..4f5ee13
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for floating point (float, double) data type page.
+ * This codec will upscale the diff from page max value to integer value,
+ * and do type casting to make storage minimum.
+ */
+public class UpscaleFloatingCodec extends AdaptiveCompressionCodec {
+
+  private ColumnPage encodedPage;
+  private double factor;
+
+  public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
+      ColumnPageStatsVO stats, Compressor compressor) {
+    return new UpscaleFloatingCodec(srcDataType, targetDataType, stats, compressor);
+  }
+
+  private UpscaleFloatingCodec(DataType srcDataType, DataType targetDataType,
+      ColumnPageStatsVO stats, Compressor compressor) {
+    super(srcDataType, targetDataType, stats, compressor);
+    this.factor = Math.pow(10, stats.getDecimal());
+  }
+
+  @Override public String getName() {
+    return "UpscaleFloatingCodec";
+  }
+
+  @Override
+  public byte[] encode(ColumnPage input) {
+    encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+    input.encode(codec);
+    return encodedPage.compress(compressor);
+  }
+
+
+  @Override
+  public ColumnPage decode(byte[] input, int offset, int length) {
+    ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+    return LazyColumnPage.newPage(page, codec);
+  }
+
+  // encoded value = (10 power of decimal) * (page value)
+  private PrimitiveCodec codec = new PrimitiveCodec() {
+    @Override
+    public void encode(int rowId, byte value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(Math.round(factor * value)));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(Math.round(factor * value)));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(Math.round(factor * value)));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(Math.round(factor * value)));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(Math.round(factor * value)));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(Math.round(factor * value)));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(Math.round(factor * value)));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(Math.round(factor * value)));
+          break;
+        case DOUBLE:
+          encodedPage.putDouble(rowId, (Math.round(factor * value)));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return value / factor;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return value / factor;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return value / factor;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return value / factor;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java
deleted file mode 100644
index 960a530..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.statistics;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/** statics for one column page */
-public class ColumnPageStatistics {
-  private DataType dataType;
-
-  /** min and max value of the measures */
-  private Object min, max;
-
-  /**
-   * the unique value is the non-exist value in the row,
-   * and will be used as storage key for null values of measures
-   */
-  private Object nonExistValue;
-
-  /** decimal count of the measures */
-  private int decimal;
-
-  public ColumnPageStatistics(DataType dataType) {
-    this.dataType = dataType;
-    switch (dataType) {
-      case SHORT:
-      case INT:
-      case LONG:
-        max = Long.MIN_VALUE;
-        min = Long.MAX_VALUE;
-        nonExistValue = Long.MIN_VALUE;
-        break;
-      case DOUBLE:
-        max = Double.MIN_VALUE;
-        min = Double.MAX_VALUE;
-        nonExistValue = Double.MIN_VALUE;
-        break;
-      case DECIMAL:
-        max = new BigDecimal(Double.MIN_VALUE);
-        min = new BigDecimal(Double.MAX_VALUE);
-        nonExistValue = new BigDecimal(Double.MIN_VALUE);
-        break;
-    }
-    decimal = 0;
-  }
-
-  /**
-   * update the statistics for the input row
-   */
-  public void update(Object value) {
-    switch (dataType) {
-      case SHORT:
-        max = ((long) max > ((Short) value).longValue()) ? max : ((Short) value).longValue();
-        min = ((long) min < ((Short) value).longValue()) ? min : ((Short) value).longValue();
-        nonExistValue = (long) min - 1;
-        break;
-      case INT:
-        max = ((long) max > ((Integer) value).longValue()) ? max : ((Integer) value).longValue();
-        min = ((long) min  < ((Integer) value).longValue()) ? min : ((Integer) value).longValue();
-        nonExistValue = (long) min - 1;
-        break;
-      case LONG:
-        max = ((long) max > (long) value) ? max : value;
-        min = ((long) min < (long) value) ? min : value;
-        nonExistValue = (long) min - 1;
-        break;
-      case DOUBLE:
-        max = ((double) max > (double) value) ? max : value;
-        min = ((double) min < (double) value) ? min : value;
-        int num = getDecimalCount((double) value);
-        decimal = decimal > num ? decimal : num;
-        nonExistValue = (double) min - 1;
-        break;
-      case DECIMAL:
-        BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value);
-        decimal = decimalValue.scale();
-        BigDecimal val = (BigDecimal) min;
-        nonExistValue = (val.subtract(new BigDecimal(1.0)));
-        break;
-      case ARRAY:
-      case STRUCT:
-        // for complex type column, writer is not going to use stats, so, do nothing
-    }
-  }
-
-  /**
-   * return no of digit after decimal
-   */
-  private int getDecimalCount(double value) {
-    String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
-    int integerPlaces = strValue.indexOf('.');
-    int decimalPlaces = 0;
-    if (-1 != integerPlaces) {
-      decimalPlaces = strValue.length() - integerPlaces - 1;
-    }
-    return decimalPlaces;
-  }
-
-  /**
-   * return min value as byte array
-   */
-  public byte[] minBytes() {
-    return getValueAsBytes(getMin());
-  }
-
-  /**
-   * return max value as byte array
-   */
-  public byte[] maxBytes() {
-    return getValueAsBytes(getMax());
-  }
-
-  /**
-   * convert value to byte array
-   */
-  private byte[] getValueAsBytes(Object value) {
-    ByteBuffer b;
-    switch (dataType) {
-      case DOUBLE:
-        b = ByteBuffer.allocate(8);
-        b.putDouble((Double) value);
-        b.flip();
-        return b.array();
-      case LONG:
-      case INT:
-      case SHORT:
-        b = ByteBuffer.allocate(8);
-        b.putLong((Long) value);
-        b.flip();
-        return b.array();
-      case DECIMAL:
-        return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
-      default:
-        throw new IllegalArgumentException("Invalid data type");
-    }
-  }
-
-  public Object getMin() {
-    return min;
-  }
-
-  public Object getMax() {
-    return max;
-  }
-
-  public Object nonExistValue() {
-    return nonExistValue;
-  }
-
-  public int getDecimal() {
-    return decimal;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
new file mode 100644
index 0000000..a5b3148
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/** statics for one column page */
+public class ColumnPageStatsVO {
+  private DataType dataType;
+
+  /** min and max value of the measures */
+  private Object min, max;
+
+  /**
+   * the unique value is the non-exist value in the row,
+   * and will be used as storage key for null values of measures
+   */
+  private Object nonExistValue;
+
+  /** decimal count of the measures */
+  private int decimal;
+
+  public ColumnPageStatsVO(DataType dataType) {
+    this.dataType = dataType;
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+        max = Long.MIN_VALUE;
+        min = Long.MAX_VALUE;
+        nonExistValue = Long.MIN_VALUE;
+        break;
+      case DOUBLE:
+        max = Double.MIN_VALUE;
+        min = Double.MAX_VALUE;
+        nonExistValue = Double.MIN_VALUE;
+        break;
+      case DECIMAL:
+        max = new BigDecimal(Double.MIN_VALUE);
+        min = new BigDecimal(Double.MAX_VALUE);
+        nonExistValue = new BigDecimal(Double.MIN_VALUE);
+        break;
+    }
+    decimal = 0;
+  }
+
+  public static ColumnPageStatsVO copyFrom(ValueEncoderMeta meta) {
+    ColumnPageStatsVO instance = new ColumnPageStatsVO(meta.getType());
+    instance.min = meta.getMinValue();
+    instance.max = meta.getMaxValue();
+    instance.decimal = meta.getDecimal();
+    instance.nonExistValue = meta.getUniqueValue();
+    return instance;
+  }
+
+  /**
+   * update the statistics for the input row
+   */
+  public void update(Object value) {
+    switch (dataType) {
+      case SHORT:
+        max = ((long) max > ((Short) value).longValue()) ? max : ((Short) value).longValue();
+        min = ((long) min < ((Short) value).longValue()) ? min : ((Short) value).longValue();
+        nonExistValue = (long) min - 1;
+        break;
+      case INT:
+        max = ((long) max > ((Integer) value).longValue()) ? max : ((Integer) value).longValue();
+        min = ((long) min  < ((Integer) value).longValue()) ? min : ((Integer) value).longValue();
+        nonExistValue = (long) min - 1;
+        break;
+      case LONG:
+        max = ((long) max > (long) value) ? max : value;
+        min = ((long) min < (long) value) ? min : value;
+        nonExistValue = (long) min - 1;
+        break;
+      case DOUBLE:
+        max = ((double) max > (double) value) ? max : value;
+        min = ((double) min < (double) value) ? min : value;
+        int num = getDecimalCount((double) value);
+        decimal = decimal > num ? decimal : num;
+        nonExistValue = (double) min - 1;
+        break;
+      case DECIMAL:
+        BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value);
+        decimal = decimalValue.scale();
+        BigDecimal val = (BigDecimal) min;
+        nonExistValue = (val.subtract(new BigDecimal(1.0)));
+        break;
+      case ARRAY:
+      case STRUCT:
+        // for complex type column, writer is not going to use stats, so, do nothing
+    }
+  }
+
+  public void updateNull() {
+    switch (dataType) {
+      case SHORT:
+        max = ((long) max > 0) ? max : 0L;
+        min = ((long) min < 0) ? min : 0L;
+        nonExistValue = (long) min - 1;
+        break;
+      case INT:
+        max = ((long) max > 0) ? max : 0L;
+        min = ((long) min  < 0) ? min : 0L;
+        nonExistValue = (long) min - 1;
+        break;
+      case LONG:
+        max = ((long) max > 0) ? max : 0L;
+        min = ((long) min < 0) ? min : 0L;
+        nonExistValue = (long) min - 1;
+        break;
+      case DOUBLE:
+        max = ((double) max > 0d) ? max : 0d;
+        min = ((double) min < 0d) ? min : 0d;
+        int num = getDecimalCount(0d);
+        decimal = decimal > num ? decimal : num;
+        nonExistValue = (double) min - 1;
+        break;
+      case DECIMAL:
+        BigDecimal decimalValue = BigDecimal.ZERO;
+        decimal = decimalValue.scale();
+        BigDecimal val = (BigDecimal) min;
+        nonExistValue = (val.subtract(new BigDecimal(1.0)));
+        break;
+      case ARRAY:
+      case STRUCT:
+        // for complex type column, writer is not going to use stats, so, do nothing
+    }
+  }
+
+  /**
+   * return no of digit after decimal
+   */
+  private int getDecimalCount(double value) {
+    return BigDecimal.valueOf(value).scale();
+  }
+
+  /**
+   * return min value as byte array
+   */
+  public byte[] minBytes() {
+    return getValueAsBytes(getMin());
+  }
+
+  /**
+   * return max value as byte array
+   */
+  public byte[] maxBytes() {
+    return getValueAsBytes(getMax());
+  }
+
+  /**
+   * convert value to byte array
+   */
+  private byte[] getValueAsBytes(Object value) {
+    ByteBuffer b;
+    switch (dataType) {
+      case DOUBLE:
+        b = ByteBuffer.allocate(8);
+        b.putDouble((Double) value);
+        b.flip();
+        return b.array();
+      case LONG:
+      case INT:
+      case SHORT:
+        b = ByteBuffer.allocate(8);
+        b.putLong((Long) value);
+        b.flip();
+        return b.array();
+      case DECIMAL:
+        return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
+      default:
+        throw new IllegalArgumentException("Invalid data type");
+    }
+  }
+
+  public Object getMin() {
+    return min;
+  }
+
+  public Object getMax() {
+    return max;
+  }
+
+  public Object nonExistValue() {
+    return nonExistValue;
+  }
+
+  public int getDecimal() {
+    return decimal;
+  }
+
+  public DataType getDataType() {
+    return dataType;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("min: %s, max: %s, decimal: %s ", min, max, decimal);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
index 33440f5..865ae04 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
@@ -40,7 +40,7 @@ public class MeasurePageStatsVO {
     dataType = new DataType[measurePages.length];
     selectedDataType = new byte[measurePages.length];
     for (int i = 0; i < measurePages.length; i++) {
-      ColumnPageStatistics stats = measurePages[i].getStatistics();
+      ColumnPageStatsVO stats = measurePages[i].getStatistics();
       min[i] = stats.getMin();
       max[i] = stats.getMax();
       nonExistValue[i] = stats.nonExistValue();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
index 107c52f..08dd800 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
@@ -67,7 +67,7 @@ public class WriteStepRowUtil {
     return new CarbonRow(converted);
   }
 
-  private static int[] getDictDimension(CarbonRow row) {
+  public static int[] getDictDimension(CarbonRow row) {
     return (int[]) row.getData()[DICTIONARY_DIMENSION];
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
new file mode 100644
index 0000000..5519f2d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+/**
+ * Holds the pointers for rows.
+ */
+public class IntPointerBuffer {
+
+  private int length;
+
+  private int actualSize;
+
+  private int[] pointerBlock;
+
+  private MemoryBlock baseBlock;
+
+  private MemoryBlock pointerMemoryBlock;
+
+  public IntPointerBuffer(MemoryBlock baseBlock) {
+    // TODO can be configurable, it is initial size and it can grow automatically.
+    this.length = 100000;
+    pointerBlock = new int[length];
+    this.baseBlock = baseBlock;
+  }
+
+  public IntPointerBuffer(int length) {
+    this.length = length;
+    pointerBlock = new int[length];
+  }
+
+  public void set(int index, int value) {
+    pointerBlock[index] = value;
+  }
+
+  public void set(int value) {
+    ensureMemory();
+    pointerBlock[actualSize] = value;
+    actualSize++;
+  }
+
+  /**
+   * Returns the value at position {@code index}.
+   */
+  public int get(int index) {
+    assert index >= 0 : "index (" + index + ") should >= 0";
+    assert index < length : "index (" + index + ") should < length (" + length + ")";
+    if (pointerBlock == null) {
+      return CarbonUnsafe.unsafe.getInt(pointerMemoryBlock.getBaseObject(),
+          pointerMemoryBlock.getBaseOffset() + (index * 4));
+    }
+    return pointerBlock[index];
+  }
+
+  public void loadToUnsafe() throws MemoryException {
+    pointerMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(pointerBlock.length * 4);
+    for (int i = 0; i < pointerBlock.length; i++) {
+      CarbonUnsafe.unsafe
+          .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
+              pointerBlock[i]);
+    }
+    pointerBlock = null;
+  }
+
+  public int getActualSize() {
+    return actualSize;
+  }
+
+  public MemoryBlock getBaseBlock() {
+    return baseBlock;
+  }
+
+  public int[] getPointerBlock() {
+    return pointerBlock;
+  }
+
+  private void ensureMemory() {
+    if (actualSize >= length) {
+      // Expand by quarter, may be we can correct the logic later
+      int localLength = length + (int) (length * (0.25));
+      int[] memoryAddress = new int[localLength];
+      System.arraycopy(pointerBlock, 0, memoryAddress, 0, length);
+      pointerBlock = memoryAddress;
+      length = localLength;
+    }
+  }
+
+  public void freeMemory() {
+    pointerBlock = null;
+    if (pointerMemoryBlock != null) {
+      UnsafeMemoryManager.INSTANCE.freeMemory(pointerMemoryBlock);
+    }
+    if (baseBlock != null) {
+      UnsafeMemoryManager.INSTANCE.freeMemory(baseBlock);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/memory/MemoryException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryException.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryException.java
new file mode 100644
index 0000000..af8e6e0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+public class MemoryException extends Exception {
+
+  public MemoryException(String message) {
+    super(message);
+  }
+
+  public MemoryException(Exception e) {
+    super(e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
new file mode 100644
index 0000000..132a3fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Manages memory for instance.
+ */
+public class UnsafeMemoryManager {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName());
+
+  static {
+    long size;
+    try {
+      size = Long.parseLong(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
+              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
+    } catch (Exception e) {
+      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
+      LOGGER.info("Wrong memory size given, "
+          + "so setting default value to " + size);
+    }
+    if (size < 1024) {
+      size = 1024;
+      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
+          + "so setting default value to " + size);
+    }
+
+    boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+            CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+    long takenSize = size * 1024 * 1024;
+    MemoryAllocator allocator;
+    if (offHeap) {
+      allocator = MemoryAllocator.UNSAFE;
+    } else {
+      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
+      if (takenSize > maxMemory) {
+        takenSize = maxMemory;
+      }
+      allocator = MemoryAllocator.HEAP;
+    }
+    INSTANCE = new UnsafeMemoryManager(takenSize, allocator);
+  }
+
+  public static final UnsafeMemoryManager INSTANCE;
+
+  private long totalMemory;
+
+  private long memoryUsed;
+
+  private MemoryAllocator allocator;
+
+  private long minimumMemory;
+
+  private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
+    this.totalMemory = totalMemory;
+    this.allocator = allocator;
+    long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+    sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
+    long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
+    if (totalWorkingMemoryForAllThreads >= totalMemory) {
+      throw new RuntimeException("Working memory should be less than total memory configured, "
+          + "so either reduce the loading threads or increase the memory size. "
+          + "(Number of threads * number of threads) should be less than total unsafe memory");
+    }
+    minimumMemory = totalWorkingMemoryForAllThreads;
+    LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
+        + " and minimum reserve memory " + minimumMemory);
+  }
+
+  public synchronized MemoryBlock allocateMemory(long memoryRequested) {
+    if (memoryUsed + memoryRequested <= totalMemory) {
+      MemoryBlock allocate = allocator.allocate(memoryRequested);
+      memoryUsed += allocate.size();
+      LOGGER.info("Memory block is created with size "  + allocate.size() +
+          " Total memory used " + memoryUsed + " memory left " + (getAvailableMemory()));
+      return allocate;
+    }
+    return null;
+  }
+
+  public synchronized void freeMemory(MemoryBlock memoryBlock) {
+    allocator.free(memoryBlock);
+    memoryUsed -= memoryBlock.size();
+    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
+    LOGGER.info(
+        "Memory released, memory used " + memoryUsed + " memory left " + (getAvailableMemory()));
+  }
+
+  public synchronized long getAvailableMemory() {
+    return totalMemory - memoryUsed;
+  }
+
+  public boolean isMemoryAvailable() {
+    return getAvailableMemory() > minimumMemory;
+  }
+
+  public long getUsableMemory() {
+    return totalMemory - minimumMemory;
+  }
+
+  /**
+   * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
+   */
+  public static MemoryBlock allocateMemoryWithRetry(long size) throws MemoryException {
+    MemoryBlock baseBlock = null;
+    int tries = 0;
+    while (tries < 100) {
+      baseBlock = INSTANCE.allocateMemory(size);
+      if (baseBlock == null) {
+        try {
+          Thread.sleep(50);
+        } catch (InterruptedException e) {
+          throw new MemoryException(e);
+        }
+      } else {
+        break;
+      }
+      tries++;
+    }
+    if (baseBlock == null) {
+      throw new MemoryException("Not enough memory to create page");
+    }
+    return baseBlock;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index 204ac1c..b5d175d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -19,28 +19,35 @@ package org.apache.carbondata.core.metadata.datatype;
 
 public enum DataType {
 
-  STRING(0, "STRING"),
-  DATE(1, "DATE"),
-  TIMESTAMP(2, "TIMESTAMP"),
-  BOOLEAN(1, "BOOLEAN"),
-  SHORT(2, "SMALLINT"),
-  INT(3, "INT"),
-  FLOAT(4, "FLOAT"),
-  LONG(5, "BIGINT"),
-  DOUBLE(6, "DOUBLE"),
-  NULL(7, "NULL"),
-  DECIMAL(8, "DECIMAL"),
-  ARRAY(9, "ARRAY"),
-  STRUCT(10, "STRUCT"),
-  MAP(11, "MAP"),
-  BYTE(12, "BYTE");
+  STRING(0, "STRING", -1),
+  DATE(1, "DATE", -1),
+  TIMESTAMP(2, "TIMESTAMP", -1),
+  BOOLEAN(1, "BOOLEAN", 1),
+  SHORT(2, "SMALLINT", 2),
+  INT(3, "INT", 4),
+  FLOAT(4, "FLOAT", 4),
+  LONG(5, "BIGINT", 8),
+  DOUBLE(6, "DOUBLE", 8),
+  NULL(7, "NULL", 1),
+  DECIMAL(8, "DECIMAL", -1),
+  ARRAY(9, "ARRAY", -1),
+  STRUCT(10, "STRUCT", -1),
+  MAP(11, "MAP", -1),
+  BYTE(12, "BYTE", 1),
+
+  // internal use only
+  BYTE_ARRAY(13, "BYTE_ARRAY", -1);
 
   private int precedenceOrder;
-  private String name ;
+  private String name;
+
+  // size of the value of this data type, negative value means variable length
+  private int sizeInBytes;
 
-  DataType(int value ,String  name) {
+  DataType(int value ,String name, int sizeInBytes) {
     this.precedenceOrder = value;
     this.name = name;
+    this.sizeInBytes = sizeInBytes;
   }
 
   public int getPrecedenceOrder() {
@@ -54,4 +61,9 @@ public enum DataType {
   public boolean isComplexType() {
     return precedenceOrder >= 9 && precedenceOrder <= 11;
   }
+
+  public int getSizeInBytes() {
+    return sizeInBytes;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
index 1726fb7..0fb3860 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
@@ -35,7 +35,7 @@ public class CarbonImplicitDimension extends CarbonDimension {
   private static final long serialVersionUID = 3648269871656322681L;
 
   /**
-   * List of encoding that are chained to encode the data for this column
+   * List of encoding that are chained to apply the data for this column
    */
   private List<Encoding> encodingList;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index eb4bf03..f5b8116 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -63,7 +63,7 @@ public class ColumnSchema implements Serializable {
   private boolean isColumnar = true;
 
   /**
-   * List of encoding that are chained to encode the data for this column
+   * List of encoding that are chained to apply the data for this column
    */
   private List<Encoding> encodingList;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index 8fa0348..ad17240 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -96,22 +96,21 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
     if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
       switch (carbonMeasure.getDataType()) {
         case SHORT:
-          return (short)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+          return (short)dataChunk.getColumnPage().getLong(index);
         case INT:
-          return (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+          return (int)dataChunk.getColumnPage().getLong(index);
         case LONG:
-          return dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+          return dataChunk.getColumnPage().getLong(index);
         case DECIMAL:
           BigDecimal bigDecimalMsrValue =
-              dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+              dataChunk.getColumnPage().getDecimal(index);
           if (null != bigDecimalMsrValue && carbonMeasure.getScale() > bigDecimalMsrValue.scale()) {
             bigDecimalMsrValue =
                 bigDecimalMsrValue.setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
           }
-          return org.apache.spark.sql.types.Decimal
-              .apply(bigDecimalMsrValue);
+          return org.apache.spark.sql.types.Decimal.apply(bigDecimalMsrValue);
         default:
-          return dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+          return dataChunk.getColumnPage().getDouble(index);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
index c5b5ade..f139802 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
@@ -56,7 +56,6 @@ public class NotEqualsExpression extends BinaryConditionalExpression {
     }
     //default implementation if the data types are different for the resultsets
     if (elRes.getDataType() != erRes.getDataType()) {
-      //            result = elRes.getString().equals(erRes.getString());
       if (elRes.getDataType().getPrecedenceOrder() < erRes.getDataType().getPrecedenceOrder()) {
         val1 = erRes;
         val2 = elRes;