You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/20 07:29:52 UTC

[46/56] [abbrv] carbondata git commit: add unsafe column page

add unsafe column page


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7359601b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7359601b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7359601b

Branch: refs/heads/streaming_ingest
Commit: 7359601b4a7808311bda33437333d627a6f8400d
Parents: 94c4910
Author: jackylk <ja...@huawei.com>
Authored: Mon Jun 19 11:59:37 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Mon Jun 19 11:59:37 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  10 +
 .../chunk/impl/MeasureRawColumnChunk.java       |   3 +-
 .../chunk/reader/MeasureColumnChunkReader.java  |   3 +-
 .../AbstractMeasureChunkReaderV2V3Format.java   |   3 +-
 ...CompressedMeasureChunkFileBasedReaderV1.java |   3 +-
 ...CompressedMeasureChunkFileBasedReaderV2.java |   3 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |   7 +-
 .../core/datastore/page/ColumnPage.java         | 496 ++++++-------
 .../core/datastore/page/LazyColumnPage.java     |   7 +-
 .../datastore/page/SafeFixLengthColumnPage.java | 310 ++++++++
 .../datastore/page/SafeVarLengthColumnPage.java |  71 ++
 .../page/UnsafeFixLengthColumnPage.java         | 334 +++++++++
 .../page/UnsafeVarLengthColumnPage.java         | 128 ++++
 .../datastore/page/VarLengthColumnPageBase.java | 247 +++++++
 .../page/encoding/AdaptiveCompressionCodec.java |   5 +-
 .../page/encoding/AdaptiveIntegerCodec.java     |   9 +-
 .../page/encoding/ColumnPageCodec.java          |   5 +-
 .../page/encoding/CompressionCodec.java         |   5 +-
 .../page/encoding/DeltaIntegerCodec.java        |   9 +-
 .../encoding/UpscaleDeltaFloatingCodec.java     |   9 +-
 .../page/encoding/UpscaleFloatingCodec.java     |   9 +-
 .../page/statistics/ColumnPageStatsVO.java      |  15 +-
 .../core/memory/IntPointerBuffer.java           |  16 +-
 .../core/memory/UnsafeMemoryManager.java        |  27 +-
 .../core/metadata/datatype/DataType.java        |   3 +
 .../executer/RowLevelFilterExecuterImpl.java    |  31 +-
 .../core/scan/result/AbstractScannedResult.java |  30 +-
 .../result/impl/FilterQueryScannedResult.java   |   4 +-
 .../core/scan/scanner/impl/FilterScanner.java   |   3 +-
 .../apache/carbondata/core/util/ByteUtil.java   |  12 +
 .../examples/CarbonSessionExample.scala         |   2 +-
 .../carbondata/examples/CompareTest.scala       |   4 +-
 .../TestNullAndEmptyFieldsUnsafe.scala          | 119 ++++
 .../TestLoadDataWithHiveSyntaxUnsafe.scala      | 709 +++++++++++++++++++
 .../carbondata/CarbonDataSourceSuite.scala      |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  10 +-
 .../carbondata/processing/store/TablePage.java  |  39 +-
 .../processing/store/TablePageEncoder.java      |   6 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |   2 +
 39 files changed, 2333 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ec13bd6..96c26b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1067,6 +1067,16 @@ public final class CarbonCommonConstants {
   public static final int CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME = 250;
 
   /**
+   * to enable unsafe column page in write step
+   */
+  public static final String ENABLE_UNSAFE_COLUMN_PAGE_LOADING = "enable.unsafe.columnpage";
+
+  /**
+   * default value of ENABLE_UNSAFE_COLUMN_PAGE_LOADING
+   */
+  public static final String ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT = "false";
+
+  /**
    * to enable offheap sort
    */
   public static final String ENABLE_UNSAFE_SORT = "enable.unsafe.sort";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 4702abd..143dd4d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.memory.MemoryException;
 
 /**
  * Contains raw measure data
@@ -80,7 +81,7 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
       if (dataChunks[index] == null) {
         dataChunks[index] = chunkReader.convertToMeasureChunk(this, index);
       }
-    } catch (IOException e) {
+    } catch (IOException | MemoryException e) {
       throw new RuntimeException(e);
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
index 39789b1..dba6823 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.memory.MemoryException;
 
 /**
  * Reader interface for reading the measure blocks from file
@@ -55,6 +56,6 @@ public interface MeasureColumnChunkReader {
    * @throws IOException
    */
   MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
-      int pageNumber) throws IOException;
+      int pageNumber) throws IOException, MemoryException;
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
index eba1777..2f5af87 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
@@ -129,7 +130,7 @@ public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasu
 
 
   protected ColumnPage decodeMeasure(MeasureRawColumnChunk measureRawColumnChunk,
-      DataChunk2 measureColumnChunk, int copyPoint) {
+      DataChunk2 measureColumnChunk, int copyPoint) throws MemoryException {
     // for measure, it should have only one ValueEncoderMeta
     assert (measureColumnChunk.getEncoder_meta().size() == 1);
     byte[] encodedMeta = measureColumnChunk.getEncoder_meta().get(0).array();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index cdaaf81..6e59b9f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
@@ -92,7 +93,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
 
   @Override
   public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
-      int pageNumber) throws IOException {
+      int pageNumber) throws IOException, MemoryException {
     int blockIndex = measureRawColumnChunk.getBlockletId();
     DataChunk dataChunk = measureColumnChunks.get(blockIndex);
     ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index a94193a..d90c7fe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
@@ -105,7 +106,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
   }
 
   public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measureRawColumnChunk,
-      int pageNumber) throws IOException {
+      int pageNumber) throws IOException, MemoryException {
     MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
     int copyPoint = measureRawColumnChunk.getOffSet();
     int blockIndex = measureRawColumnChunk.getBlockletId();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 325387c..2ca7193 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
@@ -198,8 +199,10 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
    * @param pageNumber            number
    * @return DimensionColumnDataChunk
    */
-  @Override public MeasureColumnDataChunk convertToMeasureChunk(
-      MeasureRawColumnChunk measureRawColumnChunk, int pageNumber) throws IOException {
+  @Override
+  public MeasureColumnDataChunk convertToMeasureChunk(
+      MeasureRawColumnChunk measureRawColumnChunk, int pageNumber)
+      throws IOException, MemoryException {
     MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
     // data chunk of blocklet column
     DataChunk3 dataChunk3 = measureRawColumnChunk.getDataChunkV3();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 863d1b0..3c47a8c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -18,15 +18,13 @@
 package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -39,130 +37,225 @@ import static org.apache.carbondata.core.metadata.datatype.DataType.INT;
 import static org.apache.carbondata.core.metadata.datatype.DataType.LONG;
 import static org.apache.carbondata.core.metadata.datatype.DataType.SHORT;
 
-/**
- * Represent a columnar data in one page for one column.
- */
-public class ColumnPage {
-
-  private final int pageSize;
-  private DataType dataType;
-  private ColumnPageStatsVO stats;
+public abstract class ColumnPage {
 
-  // Only one of following fields will be used
-  private byte[] byteData;
-  private short[] shortData;
-  private int[] intData;
-  private long[] longData;
-  private float[] floatData;
-  private double[] doubleData;
+  protected final int pageSize;
+  protected final DataType dataType;
 
-  // for string and decimal data
-  private byte[][] byteArrayData;
+  // statistics of this column page
+  private ColumnPageStatsVO stats;
 
   // The index of the rowId whose value is null, will be set to 1
   private BitSet nullBitSet;
 
+  protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
+          CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT));
+
   protected ColumnPage(DataType dataType, int pageSize) {
-    this.pageSize = pageSize;
     this.dataType = dataType;
+    this.pageSize = pageSize;
+    this.stats = new ColumnPageStatsVO(dataType);
+    this.nullBitSet = new BitSet(pageSize);
+  }
+
+  public DataType getDataType() {
+    return dataType;
+  }
+
+  public ColumnPageStatsVO getStatistics() {
+    return stats;
+  }
+
+  public int getPageSize() {
+    return pageSize;
   }
 
-  // create a new page
-  public static ColumnPage newPage(DataType dataType, int pageSize) {
+  private static ColumnPage createVarLengthPage(DataType dataType, int pageSize) {
+    if (unsafe) {
+      try {
+        return new UnsafeVarLengthColumnPage(dataType, pageSize);
+      } catch (MemoryException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      return new SafeVarLengthColumnPage(dataType, pageSize);
+    }
+  }
+
+  private static ColumnPage createFixLengthPage(DataType dataType, int pageSize) {
+    if (unsafe) {
+      try {
+        return new UnsafeFixLengthColumnPage(dataType, pageSize);
+      } catch (MemoryException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      return new SafeFixLengthColumnPage(dataType, pageSize);
+    }
+  }
+
+  private static ColumnPage createPage(DataType dataType, int pageSize) {
+    if (dataType.equals(BYTE_ARRAY) | dataType.equals(DECIMAL)) {
+      return createVarLengthPage(dataType, pageSize);
+    } else {
+      return createFixLengthPage(dataType, pageSize);
+    }
+  }
+
+  public static ColumnPage newVarLengthPath(DataType dataType, int pageSize) {
+    if (unsafe) {
+      try {
+        return new UnsafeVarLengthColumnPage(dataType, pageSize);
+      } catch (MemoryException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      return new SafeVarLengthColumnPage(dataType, pageSize);
+    }
+  }
+
+  /**
+   * Create a new page of dataType and number of row = pageSize
+   */
+  public static ColumnPage newPage(DataType dataType, int pageSize) throws MemoryException {
     ColumnPage instance;
-    switch (dataType) {
-      case BYTE:
-        instance = newBytePage(new byte[pageSize]);
-        break;
-      case SHORT:
-        instance = newShortPage(new short[pageSize]);
-        break;
-      case INT:
-        instance = newIntPage(new int[pageSize]);
-        break;
-      case LONG:
-        instance = newLongPage(new long[pageSize]);
-        break;
-      case FLOAT:
-        instance = newFloatPage(new float[pageSize]);
-        break;
-      case DOUBLE:
-        instance = newDoublePage(new double[pageSize]);
-        break;
-      case DECIMAL:
-        instance = newDecimalPage(new byte[pageSize][]);
-        break;
-      case BYTE_ARRAY:
-        instance = newVarLengthPage(new byte[pageSize][]);
-        break;
-      default:
-        throw new RuntimeException("Unsupported data dataType: " + dataType);
+    if (unsafe) {
+      switch (dataType) {
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+          instance = new UnsafeFixLengthColumnPage(dataType, pageSize);
+          break;
+        case DECIMAL:
+        case BYTE_ARRAY:
+          instance = new UnsafeVarLengthColumnPage(dataType, pageSize);
+          break;
+        default:
+          throw new RuntimeException("Unsupported data dataType: " + dataType);
+      }
+    } else {
+      switch (dataType) {
+        case BYTE:
+          instance = newBytePage(new byte[pageSize]);
+          break;
+        case SHORT:
+          instance = newShortPage(new short[pageSize]);
+          break;
+        case INT:
+          instance = newIntPage(new int[pageSize]);
+          break;
+        case LONG:
+          instance = newLongPage(new long[pageSize]);
+          break;
+        case FLOAT:
+          instance = newFloatPage(new float[pageSize]);
+          break;
+        case DOUBLE:
+          instance = newDoublePage(new double[pageSize]);
+          break;
+        case DECIMAL:
+          instance = newDecimalPage(new byte[pageSize][]);
+          break;
+        default:
+          throw new RuntimeException("Unsupported data dataType: " + dataType);
+      }
     }
-    instance.stats = new ColumnPageStatsVO(dataType);
-    instance.nullBitSet = new BitSet(pageSize);
     return instance;
   }
 
-  protected static ColumnPage newBytePage(byte[] byteData) {
-    ColumnPage columnPage = new ColumnPage(BYTE, byteData.length);
-    columnPage.byteData = byteData;
+  private static ColumnPage newBytePage(byte[] byteData) {
+    ColumnPage columnPage = createPage(BYTE, byteData.length);
+    columnPage.setBytePage(byteData);
     return columnPage;
   }
 
-  protected static ColumnPage newShortPage(short[] shortData) {
-    ColumnPage columnPage = new ColumnPage(SHORT, shortData.length);
-    columnPage.shortData = shortData;
+  private static ColumnPage newShortPage(short[] shortData) {
+    ColumnPage columnPage = createPage(SHORT, shortData.length);
+    columnPage.setShortPage(shortData);
     return columnPage;
   }
 
-  protected static ColumnPage newIntPage(int[] intData) {
-    ColumnPage columnPage = new ColumnPage(INT, intData.length);
-    columnPage.intData = intData;
+  private static ColumnPage newIntPage(int[] intData) {
+    ColumnPage columnPage = createPage(INT, intData.length);
+    columnPage.setIntPage(intData);
     return columnPage;
   }
 
-  protected static ColumnPage newLongPage(long[] longData) {
-    ColumnPage columnPage = new ColumnPage(LONG, longData.length);
-    columnPage.longData = longData;
+  private static ColumnPage newLongPage(long[] longData) {
+    ColumnPage columnPage = createPage(LONG, longData.length);
+    columnPage.setLongPage(longData);
     return columnPage;
   }
 
-  protected static ColumnPage newFloatPage(float[] floatData) {
-    ColumnPage columnPage = new ColumnPage(FLOAT, floatData.length);
-    columnPage.floatData = floatData;
+  private static ColumnPage newFloatPage(float[] floatData) {
+    ColumnPage columnPage = createPage(FLOAT, floatData.length);
+    columnPage.setFloatPage(floatData);
     return columnPage;
   }
 
-  protected static ColumnPage newDoublePage(double[] doubleData) {
-    ColumnPage columnPage = new ColumnPage(DOUBLE, doubleData.length);
-    columnPage.doubleData = doubleData;
+  private static ColumnPage newDoublePage(double[] doubleData) {
+    ColumnPage columnPage = createPage(DOUBLE, doubleData.length);
+    columnPage.setDoublePage(doubleData);
     return columnPage;
   }
 
-  protected static ColumnPage newDecimalPage(byte[][] decimalData) {
-    ColumnPage columnPage = new ColumnPage(DECIMAL, decimalData.length);
-    columnPage.byteArrayData = decimalData;
+  private static ColumnPage newDecimalPage(byte[][] byteArray) {
+    ColumnPage columnPage = createPage(DECIMAL, byteArray.length);
+    columnPage.setByteArrayPage(byteArray);
     return columnPage;
   }
 
-  protected static ColumnPage newVarLengthPage(byte[][] stringData) {
-    ColumnPage columnPage = new ColumnPage(BYTE_ARRAY, stringData.length);
-    columnPage.byteArrayData = stringData;
-    return columnPage;
+  private static ColumnPage newDecimalPage(byte[] lvEncodedByteArray) throws MemoryException {
+    return VarLengthColumnPageBase.newDecimalColumnPage(lvEncodedByteArray);
   }
 
-  public DataType getDataType() {
-    return dataType;
-  }
+  /**
+   * Set byte values to page
+   */
+  public abstract void setBytePage(byte[] byteData);
 
-  public ColumnPageStatsVO getStatistics() {
-    return stats;
-  }
+  /**
+   * Set short values to page
+   */
+  public abstract void setShortPage(short[] shortData);
 
-  public int getPageSize() {
-    return pageSize;
-  }
+  /**
+   * Set int values to page
+   */
+  public abstract void setIntPage(int[] intData);
 
+  /**
+   * Set long values to page
+   */
+  public abstract void setLongPage(long[] longData);
+
+  /**
+   * Set float values to page
+   */
+  public abstract void setFloatPage(float[] floatData);
+
+  /**
+   * Set double value to page
+   */
+  public abstract void setDoublePage(double[] doubleData);
+
+  /**
+   * Set byte array to page
+   */
+  public abstract void setByteArrayPage(byte[][] byteArray);
+
+  /**
+   * free memory as needed
+   */
+  public abstract void freeMemory();
+
+  /**
+   * Set value at rowId
+   */
   public void putData(int rowId, Object value) {
     if (value == null) {
       putNull(rowId);
@@ -187,8 +280,6 @@ public class ColumnPage {
         putDouble(rowId, (double) value);
         break;
       case DECIMAL:
-        putDecimalBytes(rowId, (byte[]) value);
-        break;
       case BYTE_ARRAY:
         putBytes(rowId, (byte[]) value);
         break;
@@ -201,62 +292,44 @@ public class ColumnPage {
   /**
    * Set byte value at rowId
    */
-  public void putByte(int rowId, byte value) {
-    byteData[rowId] = value;
-  }
+  public abstract void putByte(int rowId, byte value);
 
   /**
    * Set short value at rowId
    */
-  public void putShort(int rowId, short value) {
-    shortData[rowId] = value;
-  }
+  public abstract void putShort(int rowId, short value);
 
   /**
    * Set integer value at rowId
    */
-  public void putInt(int rowId, int value) {
-    intData[rowId] = value;
-  }
+  public abstract void putInt(int rowId, int value);
 
   /**
    * Set long value at rowId
    */
-  public void putLong(int rowId, long value) {
-    longData[rowId] = value;
-  }
+  public abstract void putLong(int rowId, long value);
 
   /**
    * Set double value at rowId
    */
-  public void putDouble(int rowId, double value) {
-    doubleData[rowId] = value;
-  }
+  public abstract void putDouble(int rowId, double value);
 
   /**
-   * Set decimal value at rowId
+   * Set byte array value at rowId
    */
-  public void putDecimalBytes(int rowId, byte[] decimalInBytes) {
-    // do LV (length value) coded of input bytes
-    ByteBuffer byteBuffer = ByteBuffer.allocate(decimalInBytes.length +
-        CarbonCommonConstants.INT_SIZE_IN_BYTE);
-    byteBuffer.putInt(decimalInBytes.length);
-    byteBuffer.put(decimalInBytes);
-    byteBuffer.flip();
-    byteArrayData[rowId] = byteBuffer.array();
-  }
+  public abstract void putBytes(int rowId, byte[] bytes);
 
   /**
-   * Set string value at rowId
+   * Set byte array from offset to length at rowId
    */
-  public void putBytes(int rowId, byte[] bytes) {
-    byteArrayData[rowId] = bytes;
-  }
+  public abstract void putBytes(int rowId, byte[] bytes, int offset, int length);
+
+  private static final byte[] ZERO = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
 
   /**
    * Set null at rowId
    */
-  public void putNull(int rowId) {
+  private void putNull(int rowId) {
     nullBitSet.set(rowId);
     switch (dataType) {
       case BYTE:
@@ -275,175 +348,100 @@ public class ColumnPage {
         putDouble(rowId, 0.0);
         break;
       case DECIMAL:
-        byte[] decimalInBytes = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
-        putDecimalBytes(rowId, decimalInBytes);
+        putBytes(rowId, ZERO);
         break;
     }
   }
 
   /**
-   * Get byte value at rowId
+   * Get null bitset page
    */
-  public byte getByte(int rowId) {
-    return byteData[rowId];
+  public BitSet getNullBitSet() {
+    return nullBitSet;
   }
 
   /**
+   * Get byte value at rowId
+   */
+  public abstract byte getByte(int rowId);
+
+  /**
    * Get short value at rowId
    */
-  public short getShort(int rowId) {
-    return shortData[rowId];
-  }
+  public abstract short getShort(int rowId);
 
   /**
    * Get int value at rowId
    */
-  public int getInt(int rowId) {
-    return intData[rowId];
-  }
+  public abstract int getInt(int rowId);
 
   /**
    * Get long value at rowId
    */
-  public long getLong(int rowId) {
-    return longData[rowId];
-  }
+  public abstract long getLong(int rowId);
 
   /**
    * Get float value at rowId
    */
-  public float getFloat(int rowId) {
-    return floatData[rowId];
-  }
+  public abstract float getFloat(int rowId);
 
   /**
    * Get double value at rowId
    */
-  public double getDouble(int rowId) {
-    return doubleData[rowId];
-  }
+  public abstract double getDouble(int rowId);
 
   /**
    * Get decimal value at rowId
    */
-  public byte[] getDecimalBytes(int rowId) {
-    return byteArrayData[rowId];
-  }
-
-  public BigDecimal getDecimal(int rowId) {
-    byte[] bytes = getDecimalBytes(rowId);
-    return DataTypeUtil.byteToBigDecimal(bytes);
-  }
+  public abstract BigDecimal getDecimal(int rowId);
 
   /**
    * Get byte value page
    */
-  public byte[] getBytePage() {
-    return byteData;
-  }
+  public abstract byte[] getBytePage();
 
   /**
    * Get short value page
    */
-  public short[] getShortPage() {
-    return shortData;
-  }
+  public abstract short[] getShortPage();
 
   /**
    * Get int value page
    */
-  public int[] getIntPage() {
-    return intData;
-  }
+  public abstract int[] getIntPage();
 
   /**
    * Get long value page
    */
-  public long[] getLongPage() {
-    return longData;
-  }
+  public abstract long[] getLongPage();
 
   /**
    * Get float value page
    */
-  public float[] getFloatPage() {
-    return floatData;
-  }
+  public abstract float[] getFloatPage();
 
   /**
    * Get double value page
    */
-  public double[] getDoublePage() {
-    return doubleData;
-  }
+  public abstract double[] getDoublePage();
 
   /**
-   * Get decimal value page
+   * Get variable length page data
    */
-  public byte[][] getDecimalPage() {
-    return byteArrayData;
-  }
+  public abstract byte[][] getByteArrayPage();
 
   /**
-   * Get string page
+   * For variable length page, get the flattened data
    */
-  public byte[][] getByteArrayPage() {
-    return byteArrayData;
-  }
+  public abstract byte[] getFlattenedBytePage();
 
   /**
-   * Get null bitset page
+   * Encode the page data by codec (Visitor)
    */
-  public BitSet getNullBitSet() {
-    return nullBitSet;
-  }
-
-  public void freeMemory() {
-  }
+  public abstract void encode(PrimitiveCodec codec);
 
   /**
-   * apply encoding to page data
-   * @param codec type of transformation
-   */
-  public void encode(PrimitiveCodec codec) {
-    switch (dataType) {
-      case BYTE:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, byteData[i]);
-        }
-        break;
-      case SHORT:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, shortData[i]);
-        }
-        break;
-      case INT:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, intData[i]);
-        }
-        break;
-      case LONG:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, longData[i]);
-        }
-        break;
-      case FLOAT:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, floatData[i]);
-        }
-        break;
-      case DOUBLE:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, doubleData[i]);
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException("not support encode on " + dataType + " page");
-    }
-  }
-
-  /**
-   * compress page data using specified compressor
+   * Compress page data using specified compressor
    */
   public byte[] compress(Compressor compressor) {
     switch (dataType) {
@@ -460,21 +458,19 @@ public class ColumnPage {
       case DOUBLE:
         return compressor.compressDouble(getDoublePage());
       case DECIMAL:
-        byte[] flattenedDecimal = ByteUtil.flatten(getDecimalPage());
-        return compressor.compressByte(flattenedDecimal);
+        return compressor.compressByte(getFlattenedBytePage());
       case BYTE_ARRAY:
-        byte[] flattenedString = ByteUtil.flatten(getByteArrayPage());
-        return compressor.compressByte(flattenedString);
+        return compressor.compressByte(getFlattenedBytePage());
       default:
         throw new UnsupportedOperationException("unsupport compress column page: " + dataType);
     }
   }
 
   /**
-   * decompress data and create a column page using the decompressed data
+   * Decompress data and create a column page using the decompressed data
    */
   public static ColumnPage decompress(Compressor compressor, DataType dataType,
-      byte[] compressedData, int offset, int length) {
+      byte[] compressedData, int offset, int length) throws MemoryException {
     switch (dataType) {
       case BYTE:
         byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
@@ -495,54 +491,12 @@ public class ColumnPage {
         double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
         return newDoublePage(doubleData);
       case DECIMAL:
-        byte[] decompressed = compressor.unCompressByte(compressedData, offset, length);
-        byte[][] decimal = deflatten(decompressed);
-        return newDecimalPage(decimal);
       case BYTE_ARRAY:
-        decompressed = compressor.unCompressByte(compressedData, offset, length);
-        byte[][] string = deflatten(decompressed);
-        return newVarLengthPage(string);
+        byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
+        return newDecimalPage(lvEncodedBytes);
       default:
         throw new UnsupportedOperationException("unsupport uncompress column page: " + dataType);
     }
   }
 
-  // input byte[] is LV encoded, this function can expand it into byte[][]
-  private static byte[][] deflatten(byte[] input) {
-    int pageSize = Integer.valueOf(
-        CarbonProperties.getInstance().getProperty(
-            CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
-            CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
-    int numRows = 0;
-    // offset of value of each row in input data
-    int[] offsetOfRow = new int[pageSize];
-    ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
-    for (int currentLength = 0; currentLength < input.length;) {
-      buffer.put(input, currentLength, CarbonCommonConstants.INT_SIZE_IN_BYTE);
-      buffer.flip();
-      int valueLength = buffer.getInt();
-      offsetOfRow[numRows] = currentLength + CarbonCommonConstants.INT_SIZE_IN_BYTE;
-      currentLength += CarbonCommonConstants.INT_SIZE_IN_BYTE + valueLength;
-      buffer.clear();
-      numRows++;
-    }
-    byte[][] byteArrayData = new byte[numRows][];
-    for (int rowId = 0; rowId < numRows; rowId++) {
-      int valueOffset = offsetOfRow[rowId];
-      int valueLength;
-      if (rowId != numRows - 1) {
-        valueLength = offsetOfRow[rowId + 1] - valueOffset - CarbonCommonConstants.INT_SIZE_IN_BYTE;
-      } else {
-        // last row
-        buffer.put(input, offsetOfRow[rowId] - CarbonCommonConstants.INT_SIZE_IN_BYTE,
-            CarbonCommonConstants.INT_SIZE_IN_BYTE);
-        buffer.flip();
-        valueLength = buffer.getInt();
-      }
-      byte[] value = new byte[valueLength];
-      System.arraycopy(input, valueOffset, value, 0, valueLength);
-      byteArrayData[rowId] = value;
-    }
-    return byteArrayData;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 165e027..1784136 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -21,7 +21,7 @@ package org.apache.carbondata.core.datastore.page;
  * This is a decorator of column page, it performs transformation lazily (when caller calls getXXX
  * method to get the value from the page)
  */
-public class LazyColumnPage extends ColumnPage {
+public class LazyColumnPage extends SafeFixLengthColumnPage {
 
   // decorated column page
   private ColumnPage columnPage;
@@ -79,4 +79,9 @@ public class LazyColumnPage extends ColumnPage {
         throw new RuntimeException("internal error: " + this.toString());
     }
   }
+
+  @Override
+  public void freeMemory() {
+    columnPage.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
new file mode 100644
index 0000000..34ab01c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Represent a columnar data in one page for one column.
+ */
+public class SafeFixLengthColumnPage extends ColumnPage {
+
+  // Only one of following fields will be used
+  private byte[] byteData;
+  private short[] shortData;
+  private int[] intData;
+  private long[] longData;
+  private float[] floatData;
+  private double[] doubleData;
+
+  public SafeFixLengthColumnPage(DataType dataType, int pageSize) {
+    super(dataType, pageSize);
+  }
+
+  /**
+   * Set byte value at rowId
+   */
+  @Override
+  public void putByte(int rowId, byte value) {
+    byteData[rowId] = value;
+  }
+
+  /**
+   * Set short value at rowId
+   */
+  @Override
+  public void putShort(int rowId, short value) {
+    shortData[rowId] = value;
+  }
+
+  /**
+   * Set integer value at rowId
+   */
+  @Override
+  public void putInt(int rowId, int value) {
+    intData[rowId] = value;
+  }
+
+  /**
+   * Set long value at rowId
+   */
+  @Override
+  public void putLong(int rowId, long value) {
+    longData[rowId] = value;
+  }
+
+  /**
+   * Set double value at rowId
+   */
+  @Override
+  public void putDouble(int rowId, double value) {
+    doubleData[rowId] = value;
+  }
+
+  /**
+   * Set string value at rowId
+   */
+  @Override
+  public void putBytes(int rowId, byte[] bytes) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  /**
+   * Get byte value at rowId
+   */
+  @Override
+  public byte getByte(int rowId) {
+    return byteData[rowId];
+  }
+
+  /**
+   * Get short value at rowId
+   */
+  @Override
+  public short getShort(int rowId) {
+    return shortData[rowId];
+  }
+
+  /**
+   * Get int value at rowId
+   */
+  @Override
+  public int getInt(int rowId) {
+    return intData[rowId];
+  }
+
+  /**
+   * Get long value at rowId
+   */
+  @Override
+  public long getLong(int rowId) {
+    return longData[rowId];
+  }
+
+  /**
+   * Get float value at rowId
+   */
+  @Override
+  public float getFloat(int rowId) {
+    return floatData[rowId];
+  }
+
+  /**
+   * Get double value at rowId
+   */
+  @Override
+  public double getDouble(int rowId) {
+    return doubleData[rowId];
+  }
+
+  @Override
+  public BigDecimal getDecimal(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  /**
+   * Get byte value page
+   */
+  @Override
+  public byte[] getBytePage() {
+    return byteData;
+  }
+
+  /**
+   * Get short value page
+   */
+  @Override
+  public short[] getShortPage() {
+    return shortData;
+  }
+
+  /**
+   * Get int value page
+   */
+  @Override
+  public int[] getIntPage() {
+    return intData;
+  }
+
+  /**
+   * Get long value page
+   */
+  @Override
+  public long[] getLongPage() {
+    return longData;
+  }
+
+  /**
+   * Get float value page
+   */
+  @Override
+  public float[] getFloatPage() {
+    return floatData;
+  }
+
+  /**
+   * Get double value page
+   */
+  @Override
+  public double[] getDoublePage() {
+    return doubleData;
+  }
+
+  /**
+   * Get string page
+   */
+  @Override
+  public byte[][] getByteArrayPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[] getFlattenedBytePage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  /**
+   * Set byte values to page
+   */
+  @Override
+  public void setBytePage(byte[] byteData) {
+    this.byteData = byteData;
+  }
+
+  /**
+   * Set short values to page
+   */
+  @Override
+  public void setShortPage(short[] shortData) {
+    this.shortData = shortData;
+  }
+
+  /**
+   * Set int values to page
+   */
+  @Override
+  public void setIntPage(int[] intData) {
+    this.intData = intData;
+  }
+
+  /**
+   * Set long values to page
+   */
+  @Override
+  public void setLongPage(long[] longData) {
+    this.longData = longData;
+  }
+
+  /**
+   * Set float values to page
+   */
+  @Override
+  public void setFloatPage(float[] floatData) {
+    this.floatData = floatData;
+  }
+
+  /**
+   * Set double value to page
+   */
+  @Override
+  public void setDoublePage(double[] doubleData) {
+    this.doubleData = doubleData;
+  }
+
+  /**
+   * Set decimal values to page
+   */
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void freeMemory() {
+  }
+
+  /**
+   * apply encoding to page data
+   * @param codec type of transformation
+   */
+  @Override
+  public void encode(PrimitiveCodec codec) {
+    switch (dataType) {
+      case BYTE:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, byteData[i]);
+        }
+        break;
+      case SHORT:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, shortData[i]);
+        }
+        break;
+      case INT:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, intData[i]);
+        }
+        break;
+      case LONG:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, longData[i]);
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, floatData[i]);
+        }
+        break;
+      case DOUBLE:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, doubleData[i]);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("not support encode on " + dataType + " page");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
new file mode 100644
index 0000000..3a76f55
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
+
+  // for string and decimal data
+  private byte[][] byteArrayData;
+
+  SafeVarLengthColumnPage(DataType dataType, int pageSize) {
+    super(dataType, pageSize);
+    byteArrayData = new byte[pageSize][];
+  }
+
+  @Override
+  public void freeMemory() {
+  }
+
+  @Override
+  public void putBytesAtRow(int rowId, byte[] bytes) {
+    byteArrayData[rowId] = bytes;
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    byteArrayData[rowId] = new byte[length];
+    System.arraycopy(bytes, offset, byteArrayData[rowId], 0, length);
+  }
+
+  @Override
+  public BigDecimal getDecimal(int rowId) {
+    byte[] bytes = byteArrayData[rowId];
+    return DataTypeUtil.byteToBigDecimal(bytes);
+  }
+
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    byteArrayData = byteArray;
+  }
+
+  @Override
+  public byte[][] getByteArrayPage() {
+    return byteArrayData;
+  }
+
+  @Override
+  void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
+    System.arraycopy(byteArrayData[rowId], 0, dest, destOffset, length);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
new file mode 100644
index 0000000..ac1961a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
+
+// This extension uses unsafe memory to store page data, for fix length data type only (byte,
+// short, integer, long, float, double)
+public class UnsafeFixLengthColumnPage extends ColumnPage {
+  // memory allocated by Unsafe
+  private MemoryBlock memoryBlock;
+
+  // base address of memoryBlock
+  private Object baseAddress;
+
+  // base offset of memoryBlock
+  private long baseOffset;
+
+  private static final int byteBits = BYTE.getSizeBits();
+  private static final int shortBits = DataType.SHORT.getSizeBits();
+  private static final int intBits = DataType.INT.getSizeBits();
+  private static final int longBits = DataType.LONG.getSizeBits();
+  private static final int floatBits = DataType.FLOAT.getSizeBits();
+  private static final int doubleBits = DataType.DOUBLE.getSizeBits();
+
+  UnsafeFixLengthColumnPage(DataType dataType, int pageSize) throws MemoryException {
+    super(dataType, pageSize);
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+        int size = pageSize << dataType.getSizeBits();
+        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(size);
+        baseAddress = memoryBlock.getBaseObject();
+        baseOffset = memoryBlock.getBaseOffset();
+        break;
+      case DECIMAL:
+      case STRING:
+        throw new UnsupportedOperationException("invalid data type: " + dataType);
+    }
+  }
+
+  @Override
+  public void putByte(int rowId, byte value) {
+    long offset = rowId << byteBits;
+    CarbonUnsafe.unsafe.putByte(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putShort(int rowId, short value) {
+    long offset = rowId << shortBits;
+    CarbonUnsafe.unsafe.putShort(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putInt(int rowId, int value) {
+    long offset = rowId << intBits;
+    CarbonUnsafe.unsafe.putInt(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putLong(int rowId, long value) {
+    long offset = rowId << longBits;
+    CarbonUnsafe.unsafe.putLong(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putDouble(int rowId, double value) {
+    long offset = rowId << doubleBits;
+    CarbonUnsafe.unsafe.putDouble(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    long offset = rowId << byteBits;
+    return CarbonUnsafe.unsafe.getByte(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    long offset = rowId << shortBits;
+    return CarbonUnsafe.unsafe.getShort(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    long offset = rowId << intBits;
+    return CarbonUnsafe.unsafe.getInt(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    long offset = rowId << longBits;
+    return CarbonUnsafe.unsafe.getLong(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    long offset = rowId << floatBits;
+    return CarbonUnsafe.unsafe.getFloat(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    long offset = rowId << doubleBits;
+    return CarbonUnsafe.unsafe.getDouble(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public BigDecimal getDecimal(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[] getBytePage() {
+    byte[] data = new byte[getPageSize()];
+    for (int i = 0; i < data.length; i++) {
+      long offset = i << byteBits;
+      data[i] = CarbonUnsafe.unsafe.getByte(baseAddress, baseOffset + offset);
+    }
+    return data;
+  }
+
+  @Override
+  public short[] getShortPage() {
+    short[] data = new short[getPageSize()];
+    for (int i = 0; i < data.length; i++) {
+      long offset = i << shortBits;
+      data[i] = CarbonUnsafe.unsafe.getShort(baseAddress, baseOffset + offset);
+    }
+    return data;
+  }
+
+  @Override
+  public int[] getIntPage() {
+    int[] data = new int[getPageSize()];
+    for (int i = 0; i < data.length; i++) {
+      long offset = i << intBits;
+      data[i] = CarbonUnsafe.unsafe.getInt(baseAddress, baseOffset + offset);
+    }
+    return data;
+  }
+
+  @Override
+  public long[] getLongPage() {
+    long[] data = new long[getPageSize()];
+    for (int i = 0; i < data.length; i++) {
+      long offset = i << longBits;
+      data[i] = CarbonUnsafe.unsafe.getLong(baseAddress, baseOffset + offset);
+    }
+    return data;
+  }
+
+  @Override
+  public float[] getFloatPage() {
+    float[] data = new float[getPageSize()];
+    for (int i = 0; i < data.length; i++) {
+      long offset = i << floatBits;
+      data[i] = CarbonUnsafe.unsafe.getFloat(baseAddress, baseOffset + offset);
+    }
+    return data;
+  }
+
+  @Override
+  public double[] getDoublePage() {
+    double[] data = new double[getPageSize()];
+    for (int i = 0; i < data.length; i++) {
+      long offset = i << doubleBits;
+      data[i] = CarbonUnsafe.unsafe.getDouble(baseAddress, baseOffset + offset);
+    }
+    return data;
+  }
+
+  @Override
+  public byte[][] getByteArrayPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[] getFlattenedBytePage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setBytePage(byte[] byteData) {
+    for (int i = 0; i < byteData.length; i++) {
+      long offset = i << byteBits;
+      CarbonUnsafe.unsafe.putByte(baseAddress, baseOffset + offset, byteData[i]);
+    }
+  }
+
+  @Override
+  public void setShortPage(short[] shortData) {
+    for (int i = 0; i < shortData.length; i++) {
+      long offset = i << shortBits;
+      CarbonUnsafe.unsafe.putShort(baseAddress, baseOffset + offset, shortData[i]);
+    }
+  }
+
+  @Override
+  public void setIntPage(int[] intData) {
+    for (int i = 0; i < intData.length; i++) {
+      long offset = i << intBits;
+      CarbonUnsafe.unsafe.putInt(baseAddress, baseOffset + offset, intData[i]);
+    }
+  }
+
+  @Override
+  public void setLongPage(long[] longData) {
+    for (int i = 0; i < longData.length; i++) {
+      long offset = i << longBits;
+      CarbonUnsafe.unsafe.putLong(baseAddress, baseOffset + offset, longData[i]);
+    }
+  }
+
+  @Override
+  public void setFloatPage(float[] floatData) {
+    for (int i = 0; i < floatData.length; i++) {
+      long offset = i << floatBits;
+      CarbonUnsafe.unsafe.putFloat(baseAddress, baseOffset + offset, floatData[i]);
+    }
+  }
+
+  @Override
+  public void setDoublePage(double[] doubleData) {
+    for (int i = 0; i < doubleData.length; i++) {
+      long offset = i << doubleBits;
+      CarbonUnsafe.unsafe.putDouble(baseAddress, baseOffset + offset, doubleData[i]);
+    }
+  }
+
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  public void freeMemory() {
+    if (memoryBlock != null) {
+      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+      memoryBlock = null;
+      baseAddress = null;
+      baseOffset = 0;
+    }
+  }
+
+  @Override
+  public void encode(PrimitiveCodec codec) {
+    int pageSize = getPageSize();
+    switch (dataType) {
+      case BYTE:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << byteBits;
+          codec.encode(i, CarbonUnsafe.unsafe.getByte(baseAddress, baseOffset + offset));
+        }
+        break;
+      case SHORT:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << shortBits;
+          codec.encode(i, CarbonUnsafe.unsafe.getShort(baseAddress, baseOffset + offset));
+        }
+        break;
+      case INT:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << intBits;
+          codec.encode(i, CarbonUnsafe.unsafe.getInt(baseAddress, baseOffset + offset));
+        }
+        break;
+      case LONG:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << longBits;
+          codec.encode(i, CarbonUnsafe.unsafe.getLong(baseAddress, baseOffset + offset));
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << floatBits;
+          codec.encode(i, CarbonUnsafe.unsafe.getFloat(baseAddress, baseOffset + offset));
+        }
+        break;
+      case DOUBLE:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << doubleBits;
+          codec.encode(i, CarbonUnsafe.unsafe.getDouble(baseAddress, baseOffset + offset));
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("invalid data type: " + dataType);
+    }
+  }
+
+  @Override
+  public byte[] compress(Compressor compressor) {
+    // TODO: use zero-copy raw compression
+    return super.compress(compressor);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
new file mode 100644
index 0000000..75b5312
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+// This extension uses unsafe memory to store page data, for variable length data type (string,
+// decimal)
+public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
+
+  // memory allocated by Unsafe
+  private MemoryBlock memoryBlock;
+
+  // base address of memoryBlock
+  private Object baseAddress;
+
+  // base offset of memoryBlock
+  private long baseOffset;
+
+  // size of the allocated memory, in bytes
+  private int capacity;
+
+  // default size for each row, grows as needed
+  private static final int DEFAULT_ROW_SIZE = 8;
+
+  private static final double FACTOR = 1.25;
+
+  UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws MemoryException {
+    super(dataType, pageSize);
+    capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
+    memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
+    baseAddress = memoryBlock.getBaseObject();
+    baseOffset = memoryBlock.getBaseOffset();
+  }
+
+  @Override
+  public void freeMemory() {
+    if (memoryBlock != null) {
+      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+      memoryBlock = null;
+      baseAddress = null;
+      baseOffset = 0;
+    }
+  }
+
+  private void ensureMemory(int requestSize) throws MemoryException {
+    if (totalLength + requestSize > capacity) {
+      int newSize = 2 * capacity;
+      MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(newSize);
+      CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset,
+          newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
+      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
+      memoryBlock = newBlock;
+      baseAddress = newBlock.getBaseObject();
+      baseOffset = newBlock.getBaseOffset();
+      capacity = newSize;
+    }
+  }
+
+  @Override
+  public void putBytesAtRow(int rowId, byte[] bytes) {
+    try {
+      ensureMemory(bytes.length);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
+    CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET,
+        baseAddress, baseOffset + rowOffset[rowId], bytes.length);
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset,
+        baseAddress, baseOffset + rowOffset[rowId], length);
+  }
+
+  @Override
+  public BigDecimal getDecimal(int rowId) {
+    int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    byte[] bytes = new byte[length];
+    CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+        bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+
+    return DataTypeUtil.byteToBigDecimal(bytes);
+  }
+
+  @Override
+  public byte[][] getByteArrayPage() {
+    byte[][] bytes = new byte[pageSize][];
+    for (int rowId = 0; rowId < pageSize; rowId++) {
+      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+      byte[] rowData = new byte[length];
+      CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+          rowData, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+      bytes[rowId] = rowData;
+    }
+    return bytes;
+  }
+
+  @Override
+  void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
+    CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+        dest, CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
new file mode 100644
index 0000000..fa5f478
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
+
+public abstract class VarLengthColumnPageBase extends ColumnPage {
+
+  // the offset of row in the unsafe memory, its size is pageSize + 1
+  int[] rowOffset;
+
+  // the length of bytes added in the page
+  int totalLength;
+
+  VarLengthColumnPageBase(DataType dataType, int pageSize) {
+    super(dataType, pageSize);
+    rowOffset = new int[pageSize + 1];
+    totalLength = 0;
+  }
+
+  @Override
+  public void setBytePage(byte[] byteData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setShortPage(short[] shortData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setIntPage(int[] intData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setLongPage(long[] longData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setFloatPage(float[] floatData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setDoublePage(double[] doubleData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  /**
+   * Create a new column page based on the LV (Length Value) encoded bytes
+   */
+  static ColumnPage newDecimalColumnPage(byte[] lvEncodedBytes) throws MemoryException {
+    // extract length and data, set them to rowOffset and unsafe memory correspondingly
+    int rowId = 0;
+    List<Integer> rowOffset = new ArrayList<>();
+    List<Integer> rowLength = new ArrayList<>();
+    int length;
+    int offset;
+    int lvEncodedOffset = 0;
+
+    // extract Length field in input and calculate total length
+    for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += length) {
+      length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset);
+      rowOffset.add(offset);
+      rowLength.add(length);
+      lvEncodedOffset += 4 + length;
+      rowId++;
+    }
+    rowOffset.add(offset);
+
+    int numRows = rowId;
+
+    VarLengthColumnPageBase page;
+    if (unsafe) {
+      page = new UnsafeVarLengthColumnPage(DECIMAL, numRows);
+    } else {
+      page = new SafeVarLengthColumnPage(DECIMAL, numRows);
+    }
+
+    // set total length and rowOffset in page
+    page.totalLength = offset;
+    page.rowOffset = new int[rowId + 1];
+    for (int i = 0; i < rowId + 1; i++) {
+      page.rowOffset[i] = rowOffset.get(i);
+    }
+
+    // set data in page
+    lvEncodedOffset = 0;
+    for (int i = 0; i < numRows; i++) {
+      length = rowLength.get(i);
+      page.putBytes(i, lvEncodedBytes, lvEncodedOffset + 4, length);
+      lvEncodedOffset += 4 + length;
+    }
+
+    return page;
+  }
+
+  @Override
+  public void putByte(int rowId, byte value) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void putShort(int rowId, short value) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void putInt(int rowId, int value) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void putLong(int rowId, long value) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void putDouble(int rowId, double value) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  abstract void putBytesAtRow(int rowId, byte[] bytes);
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes) {
+    if (rowId == 0) {
+      rowOffset[0] = 0;
+    }
+    rowOffset[rowId + 1] = rowOffset[rowId] + bytes.length;
+    putBytesAtRow(rowId, bytes);
+    totalLength += bytes.length;
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[] getBytePage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public short[] getShortPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public int[] getIntPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public long[] getLongPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public float[] getFloatPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public double[] getDoublePage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  /**
+   * Copy `length` bytes from data at rowId to dest start from destOffset
+   */
+  abstract void copyBytes(int rowId, byte[] dest, int destOffset, int length);
+
+  @Override
+  public byte[] getFlattenedBytePage() {
+    // output LV encoded byte array
+    int offset = 0;
+    byte[] data = new byte[totalLength + pageSize * 4];
+    for (int rowId = 0; rowId < pageSize; rowId++) {
+      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+      ByteUtil.setInt(data, offset, length);
+      copyBytes(rowId, data, offset + 4, length);
+      offset += 4 + length;
+    }
+    return data;
+  }
+
+  @Override
+  public void encode(PrimitiveCodec codec) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
index c843b55..6127583 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.page.encoding;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
@@ -52,9 +53,9 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
 
   public abstract String getName();
 
-  public abstract byte[] encode(ColumnPage input);
+  public abstract byte[] encode(ColumnPage input) throws MemoryException;
 
-  public abstract ColumnPage decode(byte[] input, int offset, int length);
+  public abstract ColumnPage decode(byte[] input, int offset, int length) throws MemoryException;
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
index f768a14..47bc390 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
@@ -48,18 +49,20 @@ class AdaptiveIntegerCodec extends AdaptiveCompressionCodec {
   }
 
   @Override
-  public byte[] encode(ColumnPage input) {
+  public byte[] encode(ColumnPage input) throws MemoryException {
     if (srcDataType.equals(targetDataType)) {
       return input.compress(compressor);
     } else {
       encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
       input.encode(codec);
-      return encodedPage.compress(compressor);
+      byte[] result = encodedPage.compress(compressor);
+      encodedPage.freeMemory();
+      return result;
     }
   }
 
   @Override
-  public ColumnPage decode(byte[] input, int offset, int length) {
+  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
     if (srcDataType.equals(targetDataType)) {
       return ColumnPage.decompress(compressor, targetDataType, input, offset, length);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
index 21913be..afba173 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.page.encoding;
 
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
 
 /**
  *  Codec for a column page data, implementation should not keep state across pages,
@@ -35,7 +36,7 @@ public interface ColumnPageCodec {
    * @param input column page to apply
    * @return encoded data
    */
-  byte[] encode(ColumnPage input);
+  byte[] encode(ColumnPage input) throws MemoryException;
 
   /**
    * decode byte array from offset to a column page
@@ -44,5 +45,5 @@ public interface ColumnPageCodec {
    * @param length length of data to decode
    * @return decoded data
    */
-  ColumnPage decode(byte[] input, int offset, int length);
+  ColumnPage decode(byte[] input, int offset, int length) throws MemoryException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
index 4568503..722ba21 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding;
 
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
@@ -30,7 +31,7 @@ public class CompressionCodec implements ColumnPageCodec {
   private Compressor compressor;
   private DataType dataType;
 
-  protected CompressionCodec(DataType dataType, Compressor compressor) {
+  private CompressionCodec(DataType dataType, Compressor compressor) {
     this.compressor = compressor;
     this.dataType = dataType;
   }
@@ -50,7 +51,7 @@ public class CompressionCodec implements ColumnPageCodec {
   }
 
   @Override
-  public ColumnPage decode(byte[] input, int offset, int length) {
+  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
     return ColumnPage.decompress(compressor, dataType, input, offset, length);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
index e8e7779..e3ed032 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
@@ -63,14 +64,16 @@ public class DeltaIntegerCodec extends AdaptiveCompressionCodec {
   }
 
   @Override
-  public byte[] encode(ColumnPage input) {
+  public byte[] encode(ColumnPage input) throws MemoryException {
     encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
     input.encode(codec);
-    return encodedPage.compress(compressor);
+    byte[] result = encodedPage.compress(compressor);
+    encodedPage.freeMemory();
+    return result;
   }
 
   @Override
-  public ColumnPage decode(byte[] input, int offset, int length) {
+  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
     ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
     return LazyColumnPage.newPage(page, codec);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
index c58a96f..70c761f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
@@ -56,14 +57,16 @@ public class UpscaleDeltaFloatingCodec extends AdaptiveCompressionCodec {
   }
 
   @Override
-  public byte[] encode(ColumnPage input) {
+  public byte[] encode(ColumnPage input) throws MemoryException {
     encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
     input.encode(codec);
-    return encodedPage.compress(compressor);
+    byte[] result = encodedPage.compress(compressor);
+    encodedPage.freeMemory();
+    return result;
   }
 
   @Override
-  public ColumnPage decode(byte[] input, int offset, int length) {
+  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
     ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
     return LazyColumnPage.newPage(page, codec);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
index 4f5ee13..ae80684 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
@@ -50,15 +51,17 @@ public class UpscaleFloatingCodec extends AdaptiveCompressionCodec {
   }
 
   @Override
-  public byte[] encode(ColumnPage input) {
+  public byte[] encode(ColumnPage input) throws MemoryException {
     encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
     input.encode(codec);
-    return encodedPage.compress(compressor);
+    byte[] result = encodedPage.compress(compressor);
+    encodedPage.freeMemory();
+    return result;
   }
 
 
   @Override
-  public ColumnPage decode(byte[] input, int offset, int length) {
+  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
     ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
     return LazyColumnPage.newPage(page, codec);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
index a5b3148..642e6b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
@@ -175,22 +175,23 @@ public class ColumnPageStatsVO {
   private byte[] getValueAsBytes(Object value) {
     ByteBuffer b;
     switch (dataType) {
-      case DOUBLE:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
         b = ByteBuffer.allocate(8);
-        b.putDouble((Double) value);
+        b.putLong((Long) value);
         b.flip();
         return b.array();
-      case LONG:
-      case INT:
-      case SHORT:
+      case DOUBLE:
         b = ByteBuffer.allocate(8);
-        b.putLong((Long) value);
+        b.putDouble((Double) value);
         b.flip();
         return b.array();
       case DECIMAL:
         return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
       default:
-        throw new IllegalArgumentException("Invalid data type");
+        throw new IllegalArgumentException("Invalid data type: " + dataType);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7359601b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
index 5519f2d..0d604fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
@@ -44,8 +44,8 @@ public class IntPointerBuffer {
     pointerBlock = new int[length];
   }
 
-  public void set(int index, int value) {
-    pointerBlock[index] = value;
+  public void set(int rowId, int value) {
+    pointerBlock[rowId] = value;
   }
 
   public void set(int value) {
@@ -55,16 +55,16 @@ public class IntPointerBuffer {
   }
 
   /**
-   * Returns the value at position {@code index}.
+   * Returns the value at position {@code rowId}.
    */
-  public int get(int index) {
-    assert index >= 0 : "index (" + index + ") should >= 0";
-    assert index < length : "index (" + index + ") should < length (" + length + ")";
+  public int get(int rowId) {
+    assert rowId >= 0 : "rowId (" + rowId + ") should >= 0";
+    assert rowId < length : "rowId (" + rowId + ") should < length (" + length + ")";
     if (pointerBlock == null) {
       return CarbonUnsafe.unsafe.getInt(pointerMemoryBlock.getBaseObject(),
-          pointerMemoryBlock.getBaseOffset() + (index * 4));
+          pointerMemoryBlock.getBaseOffset() + (rowId << 2));
     }
-    return pointerBlock[index];
+    return pointerBlock[rowId];
   }
 
   public void loadToUnsafe() throws MemoryException {