You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/09/12 09:55:24 UTC

[3/4] carbondata git commit: [CARBONDATA-2851][CARBONDATA-2852] Support zstd as column compressor in final store

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
index d85d6cd..9bed89f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.Encoding;
 
@@ -65,7 +64,8 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu
     int[] rlePage;
 
     // uncompress the encoded column page
-    byte[] bytes = CompressorFactory.getInstance().getCompressor()
+    byte[] bytes = CompressorFactory.getInstance().getCompressor(
+        encodedColumnPage.getActualPage().getColumnPageEncoderMeta().getCompressorName())
         .unCompressByte(encodedColumnPage.getEncodedData().array(), offset,
             encodedColumnPage.getPageMetadata().data_page_length);
 
@@ -94,15 +94,10 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu
     // disable encoding using local dictionary
     encodedColumnPage.getActualPage().disableLocalDictEncoding();
 
-    // get column spec for existing column page
-    TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
-
-    // get the dataType of column
-    DataType dataType = encodedColumnPage.getActualPage().getDataType();
-
     // create a new column page which will have actual data instead of encoded data
     ColumnPage actualDataColumnPage =
-        ColumnPage.newPage(columnSpec, dataType, encodedColumnPage.getActualPage().getPageSize());
+        ColumnPage.newPage(encodedColumnPage.getActualPage().getColumnPageEncoderMeta(),
+            encodedColumnPage.getActualPage().getPageSize());
 
     // uncompressed data from encoded column page is dictionary data, get the dictionary data using
     // keygenerator
@@ -120,6 +115,8 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu
           .putBytes(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(keyArray));
     }
 
+    // get column spec for existing column page
+    TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
     FallbackEncodedColumnPage fallBackEncodedColumnPage =
         CarbonUtil.getFallBackEncodedColumnPage(actualDataColumnPage, pageIndex, columnSpec);
     // here freeing the memory of new column page created as fallback is done and

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 255e078..605fe4e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -37,7 +37,7 @@ public class LazyColumnPage extends ColumnPage {
   private ColumnPageValueConverter converter;
 
   private LazyColumnPage(ColumnPage columnPage, ColumnPageValueConverter converter) {
-    super(columnPage.getColumnSpec(), columnPage.getDataType(), columnPage.getPageSize());
+    super(columnPage.getColumnPageEncoderMeta(), columnPage.getPageSize());
     this.columnPage = columnPage;
     this.converter = converter;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
index 904d7ef..fced016 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
@@ -76,14 +76,13 @@ public class LocalDictColumnPage extends ColumnPage {
   protected LocalDictColumnPage(ColumnPage actualDataColumnPage, ColumnPage encodedColumnpage,
       LocalDictionaryGenerator localDictionaryGenerator, boolean isComplexTypePrimitive,
       boolean isDecoderBasedFallBackEnabled) {
-    super(actualDataColumnPage.getColumnSpec(), actualDataColumnPage.getDataType(),
-        actualDataColumnPage.getPageSize());
+    super(actualDataColumnPage.getColumnPageEncoderMeta(), actualDataColumnPage.getPageSize());
     // if threshold is not reached then create page level dictionary
     // for encoding with local dictionary
     if (!localDictionaryGenerator.isThresholdReached()) {
       pageLevelDictionary = new PageLevelDictionary(localDictionaryGenerator,
           actualDataColumnPage.getColumnSpec().getFieldName(), actualDataColumnPage.getDataType(),
-          isComplexTypePrimitive);
+          isComplexTypePrimitive, actualDataColumnPage.getColumnCompressorName());
       this.encodedDataColumnPage = encodedColumnpage;
       this.keyGenerator = KeyGeneratorFactory
           .getKeyGenerator(new int[] { CarbonCommonConstants.LOCAL_DICTIONARY_MAX + 1 });

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
index 89ac4a4..d3e945d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -19,8 +19,7 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -36,8 +35,8 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
   private byte[] shortIntData;
   private byte[][] byteArrayData;
 
-  SafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    super(columnSpec, dataType, pageSize);
+  SafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
     byteArrayData = new byte[pageSize][];
   }
 
@@ -189,8 +188,8 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
         }
         break;
       default:
-        throw new UnsupportedOperationException(
-            "not support value conversion on " + dataType + " page");
+        throw new UnsupportedOperationException("not support value conversion on "
+            + columnPageEncoderMeta.getStoreDataType() + " page");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 82f1510..b355220 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -22,7 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -45,8 +45,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   // total number of entries in array
   private int arrayElementCount = 0;
 
-  SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    super(columnSpec, dataType, pageSize);
+  SafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
     this.fixedLengthdata = new byte[pageSize][];
   }
 
@@ -120,17 +120,20 @@ public class SafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public void putBytes(int rowId, byte[] bytes, int offset, int length) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putDecimal(int rowId, BigDecimal decimal) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public byte[] getDecimalPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   /**
@@ -190,7 +193,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override public BigDecimal getDecimal(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -267,7 +271,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public byte[] getLVFlattenedBytePage() throws IOException {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -345,7 +350,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void setByteArrayPage(byte[][] byteArray) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -366,33 +372,33 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void convertValue(ColumnPageValueConverter codec) {
-    if (dataType == DataTypes.BYTE) {
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, byteData[i]);
       }
-    } else if (dataType == DataTypes.SHORT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, shortData[i]);
       }
-    } else if (dataType == DataTypes.INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, intData[i]);
       }
-    } else if (dataType == DataTypes.LONG) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, longData[i]);
       }
-    } else if (dataType == DataTypes.FLOAT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, floatData[i]);
       }
-    } else if (dataType == DataTypes.DOUBLE) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, doubleData[i]);
       }
     } else {
-      throw new UnsupportedOperationException("not support value conversion on " +
-          dataType + " page");
+      throw new UnsupportedOperationException("not support value conversion on "
+          + columnPageEncoderMeta.getStoreDataType() + " page");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index 274b8a7..9b47e86 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -24,16 +24,15 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 
 public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   // for string and decimal data
   private List<byte[]> byteArrayData;
 
-  SafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    super(columnSpec, dataType, pageSize);
+  SafeVarLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
     byteArrayData = new ArrayList<>();
   }
 
@@ -54,12 +53,14 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
index 96aeac2..829fad4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
@@ -19,11 +19,10 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 
@@ -32,36 +31,35 @@ import org.apache.carbondata.core.util.ByteUtil;
  */
 public class UnsafeDecimalColumnPage extends DecimalColumnPage {
 
-  UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+  UnsafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
       throws MemoryException {
-    super(columnSpec, dataType, pageSize);
-    capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
-    initMemory();
+    this(columnPageEncoderMeta, pageSize, (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR));
   }
 
-  UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
-      int capacity) throws MemoryException {
-    super(columnSpec, dataType, pageSize);
+  UnsafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int capacity)
+      throws MemoryException {
+    super(columnPageEncoderMeta, pageSize);
     this.capacity = capacity;
     initMemory();
   }
 
   private void initMemory() throws MemoryException {
-    if (dataType == DataTypes.BYTE ||
-        dataType == DataTypes.SHORT ||
-        dataType == DataTypes.INT ||
-        dataType == DataTypes.LONG) {
-      int size = pageSize << dataType.getSizeBits();
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.INT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
+      int size = pageSize << columnPageEncoderMeta.getStoreDataType().getSizeBits();
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-    } else if (dataType == DataTypes.SHORT_INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
       int size = pageSize * 3;
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-    } else if (DataTypes.isDecimal(dataType)) {
+    } else if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType())) {
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
-    } else if (dataType == DataTypes.BYTE_ARRAY) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
     } else {
-      throw new UnsupportedOperationException("invalid data type: " + dataType);
+      throw new UnsupportedOperationException(
+          "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
     }
     baseAddress = memoryBlock.getBaseObject();
     baseOffset = memoryBlock.getBaseOffset();
@@ -255,8 +253,8 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
         }
         break;
       default:
-        throw new UnsupportedOperationException(
-            "not support value conversion on " + dataType + " page");
+        throw new UnsupportedOperationException("not support value conversion on "
+            + columnPageEncoderMeta.getStoreDataType() + " page");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index f75deb6..8a53840 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -20,13 +20,12 @@ package org.apache.carbondata.core.datastore.page;
 import java.io.IOException;
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
@@ -61,40 +60,41 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   private static final int floatBits = DataTypes.FLOAT.getSizeBits();
   private static final int doubleBits = DataTypes.DOUBLE.getSizeBits();
 
-  UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+  UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
       throws MemoryException {
-    super(columnSpec, dataType, pageSize);
-    if (dataType == DataTypes.BOOLEAN ||
-        dataType == DataTypes.BYTE ||
-        dataType == DataTypes.SHORT ||
-        dataType == DataTypes.INT ||
-        dataType == DataTypes.LONG ||
-        dataType == DataTypes.FLOAT ||
-        dataType == DataTypes.DOUBLE) {
-      int size = pageSize << dataType.getSizeBits();
+    super(columnPageEncoderMeta, pageSize);
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BOOLEAN ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.INT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
+      int size = pageSize << columnPageEncoderMeta.getStoreDataType().getSizeBits();
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
       baseAddress = memoryBlock.getBaseObject();
       baseOffset = memoryBlock.getBaseOffset();
       capacity = size;
-    } else if (dataType == DataTypes.SHORT_INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
       int size = pageSize * 3;
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
       baseAddress = memoryBlock.getBaseObject();
       baseOffset = memoryBlock.getBaseOffset();
       capacity = size;
-    } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.STRING) {
-      throw new UnsupportedOperationException("invalid data type: " + dataType);
+    } else if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType()) ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.STRING) {
+      throw new UnsupportedOperationException(
+          "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
     }
     totalLength = 0;
   }
 
-  UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
-      int eachRowSize)
-      throws MemoryException {
-    this(columnSpec, dataType, pageSize);
+  UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize,
+      int eachRowSize) throws MemoryException {
+    this(columnPageEncoderMeta, pageSize);
     this.eachRowSize = eachRowSize;
     totalLength = 0;
-    if (dataType == DataTypes.BYTE_ARRAY) {
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
       memoryBlock =
           UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) pageSize * eachRowSize);
       baseAddress = memoryBlock.getBaseObject();
@@ -217,11 +217,13 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public void putBytes(int rowId, byte[] bytes, int offset, int length) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -272,7 +274,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -288,7 +291,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override public byte[] getDecimalPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -375,7 +379,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public byte[] getLVFlattenedBytePage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override public byte[] getComplexChildrenLVFlattenedBytePage() {
@@ -441,7 +446,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public void setByteArrayPage(byte[][] byteArray) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   public void freeMemory() {
@@ -455,68 +461,70 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override public void convertValue(ColumnPageValueConverter codec) {
     int endLoop = getEndLoop();
-    if (dataType == DataTypes.BYTE) {
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << byteBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.SHORT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << shortBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << intBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.LONG) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << longBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.FLOAT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << floatBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.DOUBLE) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << doubleBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset));
       }
     } else {
-      throw new UnsupportedOperationException("invalid data type: " + dataType);
+      throw new UnsupportedOperationException(
+          "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
     }
   }
 
   private int getEndLoop() {
-    if (dataType == DataTypes.BYTE) {
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
       return totalLength / ByteUtil.SIZEOF_BYTE;
-    } else if (dataType == DataTypes.SHORT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
       return totalLength / ByteUtil.SIZEOF_SHORT;
-    } else if (dataType == DataTypes.SHORT_INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
       return totalLength / ByteUtil.SIZEOF_SHORT_INT;
-    } else if (dataType == DataTypes.INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
       return totalLength / ByteUtil.SIZEOF_INT;
-    } else if (dataType == DataTypes.LONG) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
       return totalLength / ByteUtil.SIZEOF_LONG;
-    } else if (dataType == DataTypes.FLOAT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
       return totalLength / DataTypes.FLOAT.getSizeInBytes();
-    } else if (dataType == DataTypes.DOUBLE) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
       return totalLength / DataTypes.DOUBLE.getSizeInBytes();
-    } else if (dataType == DataTypes.BYTE_ARRAY) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
       return totalLength / eachRowSize;
     } else {
-      throw new UnsupportedOperationException("invalid data type: " + dataType);
+      throw new UnsupportedOperationException(
+          "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
     }
   }
 
   @Override public byte[] compress(Compressor compressor) throws MemoryException, IOException {
-    if (UnsafeMemoryManager.isOffHeap()) {
+    if (UnsafeMemoryManager.isOffHeap() && compressor.supportUnsafe()) {
       // use raw compression and copy to byte[]
       int inputSize = totalLength;
-      int compressedMaxSize = compressor.maxCompressedLength(inputSize);
+      long compressedMaxSize = compressor.maxCompressedLength(inputSize);
       MemoryBlock compressed =
           UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize);
       long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index ae57dcd..4693dba 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -19,11 +19,10 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * This extension uses unsafe memory to store page data, for variable length data type (string)
@@ -33,9 +32,9 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
   /**
    * create a page
    */
-  UnsafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+  UnsafeVarLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
       throws MemoryException {
-    super(columnSpec, dataType, pageSize);
+    super(columnPageEncoderMeta, pageSize);
     capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
     memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
     baseAddress = memoryBlock.getBaseObject();
@@ -85,7 +84,8 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 4edd201..7f0b2a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -64,13 +65,14 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   // size of the allocated memory, in bytes
   int capacity;
 
-  VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    super(columnSpec, dataType, pageSize);
-    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
-        .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
+  VarLengthColumnPageBase(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
+    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance(
+        columnPageEncoderMeta.getColumnSpec().getFieldName(), DataTypes.INT, ColumnType.MEASURE);
     try {
-      rowOffset =
-          ColumnPage.newPage(spec, DataTypes.INT, pageSize);
+      rowOffset = ColumnPage.newPage(
+          new ColumnPageEncoderMeta(spec, DataTypes.INT, columnPageEncoderMeta.getCompressorName()),
+          pageSize);
     } catch (MemoryException e) {
       throw new RuntimeException(e);
     }
@@ -79,44 +81,51 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   @Override
   public void setBytePage(byte[] byteData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setShortPage(short[] shortData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setShortIntPage(byte[] shortIntData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setIntPage(int[] intData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setLongPage(long[] longData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setFloatPage(float[] floatData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setDoublePage(double[] doubleData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   /**
    * Create a new column page for decimal page
    */
-  static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes)
-      throws MemoryException {
+  static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
+      String compressorName) throws MemoryException {
     DecimalConverterFactory.DecimalConverter decimalConverter =
         DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(),
             columnSpec.getScale());
@@ -124,10 +133,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     if (size < 0) {
       return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
           DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
-          CarbonCommonConstants.INT_SIZE_IN_BYTE);
+          CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName);
     } else {
       // Here the size is always fixed.
-      return getDecimalColumnPage(columnSpec, lvEncodedBytes, size);
+      return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName);
     }
   }
 
@@ -135,23 +144,26 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
    * Create a new column page based on the LV (Length Value) encoded bytes
    */
   static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
-      int lvLength) throws MemoryException {
-    return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
+      int lvLength, String compressorName) throws MemoryException {
+    return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY,
+        lvLength, compressorName);
   }
 
   /**
    * Create a new column page based on the LV (Length Value) encoded bytes
    */
   static ColumnPage newComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, int lvLength) throws MemoryException {
-    return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
+      byte[] lvEncodedBytes, int lvLength, String compressorName) throws MemoryException {
+    return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY,
+        lvLength, compressorName);
   }
 
   private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, int size) throws MemoryException {
+      byte[] lvEncodedBytes, int size, String compressorName) throws MemoryException {
     TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
         .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
-    ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT,
+    ColumnPage rowOffset = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
         CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
     int offset;
     int rowId = 0;
@@ -165,9 +177,13 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
     VarLengthColumnPageBase page;
     if (unsafe) {
-      page = new UnsafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId);
+      page = new UnsafeDecimalColumnPage(
+          new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+          rowId);
     } else {
-      page = new SafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId);
+      page = new SafeDecimalColumnPage(
+          new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+          rowId);
     }
 
     // set total length and rowOffset in page
@@ -181,13 +197,14 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   }
 
   private static ColumnPage getLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, DataType dataType, int lvLength)
+      byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName)
       throws MemoryException {
     // extract length and data, set them to rowOffset and unsafe memory correspondingly
     int rowId = 0;
     TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
         .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
-    ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT,
+    ColumnPage rowOffset = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
         CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
     int length;
     int offset;
@@ -202,20 +219,19 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
       counter++;
     }
     rowOffset.putInt(counter, offset);
-    VarLengthColumnPageBase page =
-        getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
-            offset);
-    return page;
+    return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType,
+        lvLength, rowId, rowOffset, offset, compressorName);
   }
 
   private static ColumnPage getComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, DataType dataType, int lvLength)
+      byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName)
       throws MemoryException {
     // extract length and data, set them to rowOffset and unsafe memory correspondingly
     int rowId = 0;
     TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
         .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
-    ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT,
+    ColumnPage rowOffset = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
         CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
     int length;
     int offset;
@@ -231,15 +247,13 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     }
     rowOffset.putInt(counter, offset);
 
-    VarLengthColumnPageBase page =
-        getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
-             offset);
-    return page;
+    return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType,
+        lvLength, rowId, rowOffset, offset, compressorName);
   }
 
   private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSpec columnSpec,
       byte[] lvEncodedBytes, DataType dataType, int lvLength, int rowId, ColumnPage rowOffset,
-      int offset) throws MemoryException {
+      int offset, String compressorName) throws MemoryException {
     int lvEncodedOffset;
     int length;
     int numRows = rowId;
@@ -247,9 +261,12 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     VarLengthColumnPageBase page;
     int inputDataLength = offset;
     if (unsafe) {
-      page = new UnsafeDecimalColumnPage(columnSpec, dataType, numRows, inputDataLength);
+      page = new UnsafeDecimalColumnPage(
+          new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows,
+          inputDataLength);
     } else {
-      page = new SafeDecimalColumnPage(columnSpec, dataType, numRows);
+      page = new SafeDecimalColumnPage(
+          new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows);
     }
 
     // set total length and rowOffset in page
@@ -269,32 +286,38 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   @Override
   public void putByte(int rowId, byte value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putShort(int rowId, short value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putShortInt(int rowId, int value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putInt(int rowId, int value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putLong(int rowId, long value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putDouble(int rowId, double value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   abstract void putBytesAtRow(int rowId, byte[] bytes);
@@ -317,72 +340,86 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   @Override
   public byte getByte(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public short getShort(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public int getShortInt(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public int getInt(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public long getLong(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public float getFloat(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public double getDouble(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public byte[] getBytePage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public short[] getShortPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public byte[] getShortIntPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public int[] getIntPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public long[] getLongPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public float[] getFloatPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public double[] getDoublePage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -445,7 +482,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   @Override
   public void convertValue(ColumnPageValueConverter codec) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index b5a63f8..2ed12a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -84,7 +84,8 @@ public abstract class ColumnPageEncoder {
   }
 
   private void fillBasicFields(ColumnPage inputPage, DataChunk2 dataChunk) {
-    dataChunk.setChunk_meta(CarbonMetadataUtil.getSnappyChunkCompressionMeta());
+    dataChunk.setChunk_meta(
+        CarbonMetadataUtil.getChunkCompressorMeta(inputPage.getColumnCompressorName()));
     dataChunk.setNumberOfRowsInpage(inputPage.getPageSize());
     dataChunk.setRowMajor(false);
   }
@@ -92,7 +93,8 @@ public abstract class ColumnPageEncoder {
   private void fillNullBitSet(ColumnPage inputPage, DataChunk2 dataChunk) {
     PresenceMeta presenceMeta = new PresenceMeta();
     presenceMeta.setPresent_bit_streamIsSet(true);
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    Compressor compressor = CompressorFactory.getInstance().getCompressor(
+        inputPage.getColumnCompressorName());
     presenceMeta.setPresent_bit_stream(
         compressor.compressByte(inputPage.getNullBits().toByteArray()));
     dataChunk.setPresence(presenceMeta);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 4e04186..971cf24 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -45,14 +45,15 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   // storage data type of this column, it could be different from data type in the column spec
   private DataType storeDataType;
 
-  // compressor name for compressing and decompressing this column
-  private String compressorName;
+  // compressor name for compressing and decompressing this column.
+  // Make it protected for RLEEncoderMeta
+  protected String compressorName;
 
   public ColumnPageEncoderMeta() {
   }
 
   public ColumnPageEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType storeDataType,
-      SimpleStatsResult stats, String compressorName) {
+      String compressorName) {
     if (columnSpec == null) {
       throw new IllegalArgumentException("columm spec must not be null");
     }
@@ -66,6 +67,11 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
     this.storeDataType = storeDataType;
     this.compressorName = compressorName;
     setType(DataType.convertType(storeDataType));
+  }
+
+  public ColumnPageEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType storeDataType,
+      SimpleStatsResult stats, String compressorName) {
+    this(columnSpec, storeDataType, compressorName);
     if (stats != null) {
       setDecimal(stats.getDecimalCount());
       setMaxValue(stats.getMax());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 1cc2ba8..29772d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -20,8 +20,6 @@ package org.apache.carbondata.core.datastore.page.encoding;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaFloatingCodec;
@@ -73,39 +71,36 @@ public class DefaultEncodingFactory extends EncodingFactory {
 
   private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec,
       ColumnPage inputPage) {
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
     switch (columnSpec.getColumnType()) {
       case GLOBAL_DICTIONARY:
       case DIRECT_DICTIONARY:
       case PLAIN_VALUE:
         return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null);
       case COMPLEX:
-        return new ComplexDimensionIndexCodec(false, false, compressor).createEncoder(null);
+        return new ComplexDimensionIndexCodec(false, false).createEncoder(null);
       default:
-        throw new RuntimeException("unsupported dimension type: " +
-            columnSpec.getColumnType());
+        throw new RuntimeException("unsupported dimension type: " + columnSpec.getColumnType());
     }
   }
 
   private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec dimensionSpec) {
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
     switch (dimensionSpec.getColumnType()) {
       case GLOBAL_DICTIONARY:
         return new DictDimensionIndexCodec(
             dimensionSpec.isInSortColumns(),
-            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            compressor).createEncoder(null);
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex())
+            .createEncoder(null);
       case DIRECT_DICTIONARY:
         return new DirectDictDimensionIndexCodec(
             dimensionSpec.isInSortColumns(),
-            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            compressor).createEncoder(null);
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex())
+            .createEncoder(null);
       case PLAIN_VALUE:
         return new HighCardDictDimensionIndexCodec(
             dimensionSpec.isInSortColumns(),
             dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR,
-            compressor).createEncoder(null);
+            dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR)
+            .createEncoder(null);
       default:
         throw new RuntimeException("unsupported dimension type: " +
             dimensionSpec.getColumnType());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 8bc67c0..d119c8f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -64,8 +64,8 @@ public abstract class EncodingFactory {
   /**
    * Return new decoder based on encoder metadata read from file
    */
-  public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas)
-      throws IOException {
+  public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
+      String compressor) throws IOException {
     assert (encodings.size() == 1);
     assert (encoderMetas.size() == 1);
     Encoding encoding = encodings.get(0);
@@ -111,21 +111,20 @@ public abstract class EncodingFactory {
     } else {
       // for backward compatibility
       ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
-      return createDecoderLegacy(metadata);
+      return createDecoderLegacy(metadata, compressor);
     }
   }
 
   /**
    * Old way of creating decoder, based on algorithm
    */
-  public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
+  public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor) {
     if (null == metadata) {
       throw new RuntimeException("internal error");
     }
     SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
     TableSpec.ColumnSpec spec =
         TableSpec.ColumnSpec.newInstanceLegacy("legacy", stats.getDataType(), ColumnType.MEASURE);
-    String compressor = "snappy";
     DataType dataType = DataType.getDataType(metadata.getType());
     if (dataType == DataTypes.BYTE ||
         dataType == DataTypes.SHORT ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 0e8d1c0..bb928c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -67,16 +67,19 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
 
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
-    final Compressor compressor = CompressorFactory.getInstance().getCompressor();
     return new ColumnPageEncoder() {
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+        encodedPage = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
             input.getPageSize());
         input.convertValue(converter);
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
         return result;
@@ -92,7 +95,7 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
         return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
-            compressor.getName());
+            inputPage.getColumnCompressorName());
       }
 
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index f20422c..ac9693d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -78,16 +78,19 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
-      final Compressor compressor = CompressorFactory.getInstance().getCompressor();
 
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+        encodedPage = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
             input.getPageSize());
         input.convertValue(converter);
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
         return result;
@@ -96,7 +99,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
         return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType,
-            inputPage.getStatistics(), compressor.getName());
+            inputPage.getStatistics(), inputPage.getColumnCompressorName());
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 6d7697b..028fa71 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -59,15 +59,18 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
 
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
-    final Compressor compressor = CompressorFactory.getInstance().getCompressor();
     return new ColumnPageEncoder() {
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+        encodedPage = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
             input.getPageSize());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         input.convertValue(converter);
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
@@ -84,7 +87,7 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
         return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
-            compressor.getName());
+            inputPage.getColumnCompressorName());
       }
 
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index cfc26c7..a9cf742 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -56,15 +56,18 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
 
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
-    final Compressor compressor = CompressorFactory.getInstance().getCompressor();
     return new ColumnPageEncoder() {
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+        encodedPage = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
             input.getPageSize());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         input.convertValue(converter);
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
@@ -81,7 +84,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
         return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
-            compressor.getName());
+            inputPage.getColumnCompressorName());
       }
 
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 7e1e9dd..aa03ec1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -55,69 +54,53 @@ public class DirectCompressCodec implements ColumnPageCodec {
 
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
-    // TODO: make compressor configurable in create table
-    return new DirectCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR);
-  }
-
-  @Override
-  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
-    return new DirectDecompressor(meta);
-  }
-
-  private class DirectCompressor extends ColumnPageEncoder {
-
-    private Compressor compressor;
-
-    DirectCompressor(String compressorName) {
-      this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
-    }
-
-    @Override
-    protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
-      return input.compress(compressor);
-    }
+    return new ColumnPageEncoder() {
 
-    @Override
-    protected List<Encoding> getEncodingList() {
-      List<Encoding> encodings = new ArrayList<>();
-      encodings.add(dataType == DataTypes.VARCHAR ?
-          Encoding.DIRECT_COMPRESS_VARCHAR :
-          Encoding.DIRECT_COMPRESS);
-      return encodings;
-    }
+      @Override
+      protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
+        return input.compress(compressor);
+      }
 
-    @Override
-    protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
-      return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(),
-          inputPage.getStatistics(), compressor.getName());
-    }
+      @Override
+      protected List<Encoding> getEncodingList() {
+        List<Encoding> encodings = new ArrayList<>();
+        encodings.add(dataType == DataTypes.VARCHAR ?
+            Encoding.DIRECT_COMPRESS_VARCHAR :
+            Encoding.DIRECT_COMPRESS);
+        return encodings;
+      }
 
+      @Override
+      protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+        return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(),
+            inputPage.getStatistics(), inputPage.getColumnCompressorName());
+      }
+    };
   }
 
-  private class DirectDecompressor implements ColumnPageDecoder {
-
-    private ColumnPageEncoderMeta meta;
-
-    DirectDecompressor(ColumnPageEncoderMeta meta) {
-      this.meta = meta;
-    }
-
-    @Override
-    public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
-      ColumnPage decodedPage;
-      if (DataTypes.isDecimal(dataType)) {
-        decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
-      } else {
-        decodedPage = ColumnPage.decompress(meta, input, offset, length, false);
+  @Override
+  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+    return new ColumnPageDecoder() {
+
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+        ColumnPage decodedPage;
+        if (DataTypes.isDecimal(dataType)) {
+          decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+        } else {
+          decodedPage = ColumnPage.decompress(meta, input, offset, length, false);
+        }
+        return LazyColumnPage.newPage(decodedPage, converter);
       }
-      return LazyColumnPage.newPage(decodedPage, converter);
-    }
 
-    @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+      @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
         throws MemoryException, IOException {
-      return LazyColumnPage
-          .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
-    }
+        return LazyColumnPage.newPage(
+            ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
+      }
+    };
   }
 
   private ColumnPageValueConverter converter = new ColumnPageValueConverter() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
index e37b8f6..cc044cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -31,9 +32,8 @@ import org.apache.carbondata.format.Encoding;
 
 public class ComplexDimensionIndexCodec extends IndexStorageCodec {
 
-  public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
-      Compressor compressor) {
-    super(isSort, isInvertedIndex, compressor);
+  public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
+    super(isSort, isInvertedIndex);
   }
 
   @Override
@@ -49,6 +49,8 @@ public class ComplexDimensionIndexCodec extends IndexStorageCodec {
         IndexStorage indexStorage =
             new BlockIndexerStorageForShort(inputPage.getByteArrayPage(), false, false, false);
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            inputPage.getColumnCompressorName());
         byte[] compressed = compressor.compressByte(flattened);
         super.indexStorage = indexStorage;
         super.compressedDataPage = compressed;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
index d157654..66f5f1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -32,8 +33,8 @@ import org.apache.carbondata.format.Encoding;
 
 public class DictDimensionIndexCodec extends IndexStorageCodec {
 
-  public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
-    super(isSort, isInvertedIndex, compressor);
+  public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
+    super(isSort, isInvertedIndex);
   }
 
   @Override
@@ -54,6 +55,8 @@ public class DictDimensionIndexCodec extends IndexStorageCodec {
           indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            inputPage.getColumnCompressorName());
         super.compressedDataPage = compressor.compressByte(flattened);
         super.indexStorage = indexStorage;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
index 1e5015b..a130cbd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -32,9 +33,8 @@ import org.apache.carbondata.format.Encoding;
 
 public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
 
-  public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
-      Compressor compressor) {
-    super(isSort, isInvertedIndex, compressor);
+  public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
+    super(isSort, isInvertedIndex);
   }
 
   @Override
@@ -55,6 +55,8 @@ public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
           indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            inputPage.getColumnCompressorName());
         super.compressedDataPage = compressor.compressByte(flattened);
         super.indexStorage = indexStorage;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
index f9c124f..bce8523 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -37,8 +38,8 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
   private boolean isVarcharType;
 
   public HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
-      boolean isVarcharType, Compressor compressor) {
-    super(isSort, isInvertedIndex, compressor);
+      boolean isVarcharType) {
+    super(isSort, isInvertedIndex);
     this.isVarcharType = isVarcharType;
   }
 
@@ -63,6 +64,8 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
               new BlockIndexerStorageForNoInvertedIndexForShort(data, isDictionary);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         super.compressedDataPage = compressor.compressByte(flattened);
         super.indexStorage = indexStorage;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
index cb6b387..13a9215 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
@@ -17,20 +17,17 @@
 
 package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
 
-import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 
 public abstract class IndexStorageCodec implements ColumnPageCodec {
-  protected Compressor compressor;
   protected boolean isSort;
   protected boolean isInvertedIndex;
 
-  IndexStorageCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+  IndexStorageCodec(boolean isSort, boolean isInvertedIndex) {
     this.isSort = isSort;
     this.isInvertedIndex = isInvertedIndex;
-    this.compressor = compressor;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index fa03809..e7d4118 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -66,7 +66,7 @@ public class RLECodec implements ColumnPageCodec {
   public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
     assert meta instanceof RLEEncoderMeta;
     RLEEncoderMeta codecMeta = (RLEEncoderMeta) meta;
-    return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize());
+    return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize(), meta.getCompressorName());
   }
 
   // This codec supports integral type only
@@ -151,7 +151,10 @@ public class RLECodec implements ColumnPageCodec {
     @Override
     protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
       return new RLEEncoderMeta(inputPage.getColumnSpec(),
-          inputPage.getDataType(), inputPage.getPageSize(), inputPage.getStatistics());
+          inputPage.getDataType(),
+          inputPage.getPageSize(),
+          inputPage.getStatistics(),
+          inputPage.getColumnCompressorName());
     }
 
     private void putValue(Object value) throws IOException {
@@ -281,11 +284,13 @@ public class RLECodec implements ColumnPageCodec {
 
     private TableSpec.ColumnSpec columnSpec;
     private int pageSize;
+    private String compressorName;
 
-    private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize) {
+    private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize, String compressorName) {
       validateDataType(columnSpec.getSchemaDataType());
       this.columnSpec = columnSpec;
       this.pageSize = pageSize;
+      this.compressorName = compressorName;
     }
 
     @Override
@@ -293,7 +298,8 @@ public class RLECodec implements ColumnPageCodec {
         throws MemoryException, IOException {
       DataType dataType = columnSpec.getSchemaDataType();
       DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
-      ColumnPage resultPage = ColumnPage.newPage(columnSpec, dataType, pageSize);
+      ColumnPage resultPage = ColumnPage.newPage(
+          new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize);
       if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
         decodeBytePage(in, resultPage);
       } else if (dataType == DataTypes.SHORT) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
index 8871671..25533f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
@@ -39,8 +39,8 @@ public class RLEEncoderMeta extends ColumnPageEncoderMeta implements Writable {
   }
 
   public RLEEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
-      SimpleStatsResult stats) {
-    super(columnSpec, dataType, stats, "");
+      SimpleStatsResult stats, String compressorName) {
+    super(columnSpec, dataType, stats, compressorName);
     this.pageSize = pageSize;
   }