You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/09/12 09:55:22 UTC

[1/4] carbondata git commit: [CARBONDATA-2851][CARBONDATA-2852] Support zstd as column compressor in final store

Repository: carbondata
Updated Branches:
  refs/heads/master 7b31b9168 -> 8f08c4abb


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 26ee65a..54dd0aa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -179,6 +179,8 @@ public class CarbonFactDataHandlerModel {
 
   private List<Integer> varcharDimIdxInNoDict;
 
+  private String columnCompressor;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    */
@@ -284,6 +286,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.taskExtension = taskExtension;
     carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
     carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
+    carbonFactDataHandlerModel.columnCompressor = configuration.getColumnCompressor();
 
     if (listener == null) {
       listener = new DataMapWriterListener();
@@ -364,6 +367,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
+    carbonFactDataHandlerModel.setColumnCompressor(loadModel.getColumnCompressor());
 
     carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable);
     DataMapWriterListener listener = new DataMapWriterListener();
@@ -700,5 +704,12 @@ public class CarbonFactDataHandlerModel {
     return varcharDimIdxInNoDict;
   }
 
+  public String getColumnCompressor() {
+    return columnCompressor;
+  }
+
+  public void setColumnCompressor(String columnCompressor) {
+    this.columnCompressor = columnCompressor;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index c46b2c2..73746d6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
@@ -85,12 +86,16 @@ public class TablePage {
 
   // used for complex column to deserilize the byte array in input CarbonRow
   private Map<Integer, GenericDataType> complexIndexMap = null;
+  // name of compressor that used to compress column data,
+  // currently all the columns share the same compressor.
+  private String columnCompressor;
 
   TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException {
     this.model = model;
     this.pageSize = pageSize;
     int numDictDimension = model.getMDKeyGenerator().getDimCount();
     TableSpec tableSpec = model.getTableSpec();
+    this.columnCompressor = model.getColumnCompressor();
 
     dictDimensionPages = new ColumnPage[numDictDimension];
     noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
@@ -102,7 +107,8 @@ public class TablePage {
       ColumnPage page;
       if (ColumnType.GLOBAL_DICTIONARY == columnType
           || ColumnType.DIRECT_DICTIONARY == columnType) {
-        page = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize);
+        page = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, columnCompressor), pageSize);
         page.setStatsCollector(KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY));
         dictDimensionPages[tmpNumDictDimIdx++] = page;
       } else {
@@ -113,11 +119,13 @@ public class TablePage {
         if (DataTypes.VARCHAR == spec.getSchemaDataType()) {
           dataType = DataTypes.VARCHAR;
         }
+        ColumnPageEncoderMeta columnPageEncoderMeta =
+            new ColumnPageEncoderMeta(spec, dataType, columnCompressor);
         if (null != localDictionaryGenerator) {
-          page = ColumnPage
-              .newLocalDictPage(spec, dataType, pageSize, localDictionaryGenerator, false);
+          page = ColumnPage.newLocalDictPage(
+              columnPageEncoderMeta, pageSize, localDictionaryGenerator, false);
         } else {
-          page = ColumnPage.newPage(spec, dataType, pageSize);
+          page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
         }
         if (DataTypes.VARCHAR == dataType) {
           page.setStatsCollector(LVLongStringStatsCollector.newInstance());
@@ -136,15 +144,15 @@ public class TablePage {
     measurePages = new ColumnPage[model.getMeasureCount()];
     DataType[] dataTypes = model.getMeasureDataType();
     for (int i = 0; i < measurePages.length; i++) {
-      TableSpec.MeasureSpec spec = model.getTableSpec().getMeasureSpec(i);
+      ColumnPageEncoderMeta columnPageEncoderMeta = new ColumnPageEncoderMeta(
+          model.getTableSpec().getMeasureSpec(i), dataTypes[i], columnCompressor);
       ColumnPage page;
-      if (DataTypes.isDecimal(spec.getSchemaDataType())) {
-        page = ColumnPage.newDecimalPage(spec, dataTypes[i], pageSize);
+      if (DataTypes.isDecimal(columnPageEncoderMeta.getSchemaDataType())) {
+        page = ColumnPage.newDecimalPage(columnPageEncoderMeta, pageSize);
       } else {
-        page = ColumnPage.newPage(spec, dataTypes[i], pageSize);
+        page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
       }
-      page.setStatsCollector(
-          PrimitivePageStatsCollector.newInstance(dataTypes[i]));
+      page.setStatsCollector(PrimitivePageStatsCollector.newInstance(dataTypes[i]));
       measurePages[i] = page;
     }
 
@@ -239,8 +247,8 @@ public class TablePage {
       complexDataType.getComplexColumnInfo(complexColumnInfoList);
       complexDimensionPages[index] = new ComplexColumnPage(complexColumnInfoList);
       try {
-        complexDimensionPages[index]
-            .initialize(model.getColumnLocalDictGenMap(), pageSize);
+        complexDimensionPages[index].initialize(
+            model.getColumnLocalDictGenMap(), pageSize, columnCompressor);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 6325528..f7ce1f2 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -27,6 +27,8 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
@@ -35,6 +37,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -89,6 +92,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
   private int measureCount;
   private DataType[] measureDataTypes;
   private StreamBlockletWriter output = null;
+  private String compressorName;
 
   // data write
   private String segmentDir;
@@ -155,25 +159,41 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
     converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
     configuration.setCardinalityFinder(converter);
     converter.initialize();
-    // initialize encoder
-    nullBitSet = new BitSet(dataFields.length);
-    int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
-        CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
-    output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize,
-        isNoDictionaryDimensionColumn.length, measureCount,
-        measureDataTypes);
-    // initialize data writer
+
+    // initialize data writer and compressor
     String filePath = segmentDir + File.separator + fileName;
     FileFactory.FileType fileType = FileFactory.getFileType(filePath);
     CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
     if (carbonFile.exists()) {
       // if the file is existed, use the append api
       outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType);
+      // get the compressor from the fileheader. In legacy store,
+      // the compressor name is not set and it use snappy compressor
+      FileHeader header = new CarbonHeaderReader(filePath).readHeader();
+      if (header.isSetCompressor_name()) {
+        compressorName = header.getCompressor_name();
+      } else {
+        compressorName = CompressorFactory.SupportedCompressor.SNAPPY.getName();
+      }
     } else {
       // IF the file is not existed, use the create api
       outputStream = FileFactory.getDataOutputStream(filePath, fileType);
+      compressorName = carbonTable.getTableInfo().getFactTable().getTableProperties().get(
+          CarbonCommonConstants.COMPRESSOR);
+      if (null == compressorName) {
+        compressorName = CompressorFactory.getInstance().getCompressor().getName();
+      }
       writeFileHeader();
     }
+
+    // initialize encoder
+    nullBitSet = new BitSet(dataFields.length);
+    int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
+        CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
+    output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize,
+        isNoDictionaryDimensionColumn.length, measureCount,
+        measureDataTypes, compressorName);
+
     isFirstRow = false;
   }
 
@@ -295,6 +315,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
     fileHeader.setIs_footer_present(false);
     fileHeader.setIs_splitable(true);
     fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+    fileHeader.setCompressor_name(compressorName);
     outputStream.write(CarbonUtil.getByteArray(fileHeader));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
index 5c7ad5e..0467fe4 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
@@ -41,13 +41,13 @@ public class StreamBlockletReader {
   private final long limitStart;
   private final long limitEnd;
   private boolean isAlreadySync = false;
-  private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  private Compressor compressor;
   private int rowNums = 0;
   private int rowIndex = 0;
   private boolean isHeaderPresent;
 
   public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit,
-      boolean isHeaderPresent) {
+      boolean isHeaderPresent, String compressorName) {
     this.syncMarker = syncMarker;
     syncLen = syncMarker.length;
     syncBuffer = new byte[syncLen];
@@ -55,6 +55,7 @@ public class StreamBlockletReader {
     limitStart = limit;
     limitEnd = limitStart + syncLen;
     this.isHeaderPresent = isHeaderPresent;
+    this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
   }
 
   private void ensureCapacity(int capacity) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
index d4322b4..c538451 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -47,7 +47,7 @@ public class StreamBlockletWriter {
   private int rowSize;
   private int count = 0;
   private int rowIndex = -1;
-  private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  private Compressor compressor;
 
   private int dimCountWithoutComplex;
   private int measureCount;
@@ -60,7 +60,7 @@ public class StreamBlockletWriter {
   private BlockletMinMaxIndex blockletMinMaxIndex;
 
   StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize, int dimCountWithoutComplex,
-      int measureCount, DataType[] measureDataTypes) {
+      int measureCount, DataType[] measureDataTypes, String compressorName) {
     buffer = new byte[maxSize];
     this.maxSize = maxSize;
     this.maxRowNum = maxRowNum;
@@ -68,6 +68,7 @@ public class StreamBlockletWriter {
     this.dimCountWithoutComplex = dimCountWithoutComplex;
     this.measureCount = measureCount;
     this.measureDataTypes = measureDataTypes;
+    this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
     initializeStatsCollector();
   }
 


[2/4] carbondata git commit: [CARBONDATA-2851][CARBONDATA-2852] Support zstd as column compressor in final store

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
index 419fd9e..d6c2352 100644
--- a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
 import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector;
 import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
@@ -57,14 +58,17 @@ public class PageLevelDictionary {
   private DataType dataType;
 
   private boolean isComplexTypePrimitive;
+  // compressor to be used for the dictionary. The compressor is the same as column compressor.
+  private String columnCompressor;
 
   public PageLevelDictionary(LocalDictionaryGenerator localDictionaryGenerator, String columnName,
-      DataType dataType, boolean isComplexTypePrimitive) {
+      DataType dataType, boolean isComplexTypePrimitive, String columnCompressor) {
     this.localDictionaryGenerator = localDictionaryGenerator;
     this.usedDictionaryValues = new BitSet();
     this.columnName = columnName;
     this.dataType = dataType;
     this.isComplexTypePrimitive = isComplexTypePrimitive;
+    this.columnCompressor = columnCompressor;
   }
 
   /**
@@ -111,8 +115,9 @@ public class PageLevelDictionary {
     }
     TableSpec.ColumnSpec spec =
         TableSpec.ColumnSpec.newInstance(columnName, DataTypes.BYTE_ARRAY, columnType);
-    ColumnPage dictionaryColumnPage =
-        ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, usedDictionaryValues.cardinality());
+    ColumnPage dictionaryColumnPage = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, columnCompressor),
+        usedDictionaryValues.cardinality());
     // TODO support data type specific stats collector for numeric data types
     dictionaryColumnPage.setStatsCollector(new DummyStatsCollector());
     int rowId = 0;
@@ -139,8 +144,9 @@ public class PageLevelDictionary {
     // get encoded dictionary values
     LocalDictionaryChunk localDictionaryChunk = encoder.encodeDictionary(dictionaryColumnPage);
     // set compressed dictionary values
-    localDictionaryChunk.setDictionary_values(CompressorFactory.getInstance().getCompressor()
-        .compressByte(usedDictionaryValues.toByteArray()));
+    localDictionaryChunk.setDictionary_values(
+        CompressorFactory.getInstance().getCompressor(columnCompressor).compressByte(
+            usedDictionaryValues.toByteArray()));
     // free the dictionary page memory
     dictionaryColumnPage.freeMemory();
     return localDictionaryChunk;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index 4dc1fbc..5a19073 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -79,6 +79,7 @@ public class DataType implements Serializable {
 
   public static char convertType(DataType dataType) {
     if (dataType == DataTypes.BYTE ||
+        dataType == DataTypes.BOOLEAN ||
         dataType == DataTypes.SHORT ||
         dataType == DataTypes.SHORT_INT ||
         dataType == DataTypes.INT ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 2285284..7cc2b09 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -717,16 +716,13 @@ public class QueryUtil {
    * Below method will be used to convert the thrift presence meta to wrapper
    * presence meta
    *
-   * @param presentMetadataThrift
    * @return wrapper presence meta
    */
   public static BitSet getNullBitSet(
-      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+      org.apache.carbondata.format.PresenceMeta presentMetadataThrift, Compressor compressor) {
     final byte[] present_bit_stream = presentMetadataThrift.getPresent_bit_stream();
     if (null != present_bit_stream) {
-      return BitSet
-          .valueOf(compressor.unCompressByte(present_bit_stream));
+      return BitSet.valueOf(compressor.unCompressByte(present_bit_stream));
     } else {
       return new BitSet(1);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index f14610c..1832cf5 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -357,8 +357,8 @@ public class BlockletDataMapUtil {
       columnSchema.write(dataOutput);
     }
     byte[] byteArray = stream.toByteArray();
-    // Compress with snappy to reduce the size of schema
-    return CompressorFactory.getInstance().getCompressor().compressByte(byteArray);
+    // Compress to reduce the size of schema
+    return CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().compressByte(byteArray);
   }
 
   /**
@@ -369,7 +369,8 @@ public class BlockletDataMapUtil {
    */
   public static List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException {
     // uncompress it.
-    schemaArray = CompressorFactory.getInstance().getCompressor().unCompressByte(schemaArray);
+    schemaArray = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().unCompressByte(
+        schemaArray);
     ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
     DataInput schemaInput = new DataInputStream(schemaStream);
     List<ColumnSchema> columnSchemas = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 3473aca..4efd5ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -39,6 +39,8 @@ public final class ByteUtil {
 
   public static final int SIZEOF_LONG = 8;
 
+  public static final int SIZEOF_FLOAT = 4;
+
   public static final int SIZEOF_DOUBLE = 8;
 
   public static final String UTF8_CSN = StandardCharsets.UTF_8.name();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 571a247..4be4f78 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage;
 import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -249,17 +250,36 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * Right now it is set to default values. We may use this in future
+   * set the compressor.
+   * before 1.5.0, we set a enum 'compression_codec';
+   * after 1.5.0, we use string 'compressor_name' instead
    */
-  public static ChunkCompressionMeta getSnappyChunkCompressionMeta() {
+  public static ChunkCompressionMeta getChunkCompressorMeta(String compressorName) {
     ChunkCompressionMeta chunkCompressionMeta = new ChunkCompressionMeta();
-    chunkCompressionMeta.setCompression_codec(CompressionCodec.SNAPPY);
+    // we will not use this field any longer and will use compressor_name instead,
+    // but in thrift definition, this field is required so we cannot set it to null, otherwise
+    // it will cause deserialization error in runtime (required field cannot be null).
+    chunkCompressionMeta.setCompression_codec(CompressionCodec.DEPRECATED);
+    chunkCompressionMeta.setCompressor_name(compressorName);
     chunkCompressionMeta.setTotal_compressed_size(0);
     chunkCompressionMeta.setTotal_uncompressed_size(0);
     return chunkCompressionMeta;
   }
 
   /**
+   * get the compressor name from chunk meta
+   * before 1.5.0, we only support snappy and do not have compressor_name field;
+   * after 1.5.0, we directly get the compressor from the compressor_name field
+   */
+  public static String getCompressorNameFromChunkMeta(ChunkCompressionMeta chunkCompressionMeta) {
+    if (chunkCompressionMeta.isSetCompressor_name()) {
+      return chunkCompressionMeta.getCompressor_name();
+    } else {
+      // this is for legacy store before 1.5.0
+      return CompressorFactory.SupportedCompressor.SNAPPY.getName();
+    }
+  }
+  /**
    * Below method will be used to get the index header
    *
    * @param columnCardinality cardinality of each column

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java
index 8360e02..acdfcf3 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java
@@ -45,8 +45,10 @@ public class RLECodecTest {
     TestData(byte[] inputByteData, byte[] expectedEncodedByteData) throws IOException, MemoryException {
       this.inputByteData = inputByteData;
       inputBytePage = ColumnPage.newPage(
-          TableSpec.ColumnSpec.newInstance("test", DataTypes.BYTE, ColumnType.MEASURE),
-          DataTypes.BYTE, inputByteData.length);
+          new ColumnPageEncoderMeta(
+              TableSpec.ColumnSpec.newInstance("test", DataTypes.BYTE, ColumnType.MEASURE),
+              DataTypes.BYTE, "snappy"),
+          inputByteData.length);
       inputBytePage.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataTypes.BYTE));
       for (int i = 0; i < inputByteData.length; i++) {
         inputBytePage.putData(i, inputByteData[i]);
@@ -131,7 +133,7 @@ public class RLECodecTest {
     RLECodec codec = new RLECodec();
     RLEEncoderMeta meta = new RLEEncoderMeta(
         TableSpec.ColumnSpec.newInstance("test", DataTypes.BYTE, ColumnType.MEASURE),
-        DataTypes.BYTE, expectedDecodedBytes.length, null);
+        DataTypes.BYTE, expectedDecodedBytes.length, null, "snappy");
     ColumnPageDecoder decoder = codec.createDecoder(meta);
     ColumnPage page = decoder.decode(inputBytes, 0, inputBytes.length);
     byte[] decoded = page.getBytePage();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java b/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java
index 3337a7d..93c770b 100644
--- a/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java
+++ b/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java
@@ -40,12 +40,14 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class TestPageLevelDictionary {
+  private String compressorName = CompressorFactory.getInstance().getCompressor(
+      CarbonCommonConstants.DEFAULT_COMPRESSOR).getName();
 
   @Test public void testPageLevelDictionaryGenerateDataIsGenertingProperDictionaryValues() {
     LocalDictionaryGenerator generator = new ColumnLocalDictionaryGenerator(1000, 2);
     String columnName = "column1";
     PageLevelDictionary pageLevelDictionary = new PageLevelDictionary(generator, columnName,
-        DataTypes.STRING, false);
+        DataTypes.STRING, false, compressorName);
     try {
       for (int i = 1; i <= 1000; i++) {
         Assert.assertTrue((i + 1) == pageLevelDictionary.getDictionaryValue(("" + i).getBytes()));
@@ -59,7 +61,8 @@ public class TestPageLevelDictionary {
   @Test public void testPageLevelDictionaryContainsOnlyUsedDictionaryValues() {
     LocalDictionaryGenerator generator = new ColumnLocalDictionaryGenerator(1000, 2);
     String columnName = "column1";
-    PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary(generator, columnName, DataTypes.STRING, false);
+    PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary(
+        generator, columnName, DataTypes.STRING, false, compressorName);
     byte[][] validateData = new byte[500][];
     try {
       for (int i = 1; i <= 500; i++) {
@@ -74,7 +77,8 @@ public class TestPageLevelDictionary {
     } catch (DictionaryThresholdReachedException e) {
       Assert.assertTrue(false);
     }
-    PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary(generator, columnName, DataTypes.STRING, false);
+    PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary(
+        generator, columnName, DataTypes.STRING, false, compressorName);
     try {
       for (int i = 1; i <= 500; i++) {
         byte[] data = ("vikas" + i).getBytes();
@@ -94,7 +98,8 @@ public class TestPageLevelDictionary {
       EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();
       List<ByteBuffer> encoderMetas =
           localDictionaryChunkForBlocklet.getDictionary_meta().getEncoder_meta();
-      ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas);
+      ColumnPageDecoder decoder = encodingFactory.createDecoder(
+          encodings, encoderMetas, compressorName);
       ColumnPage decode = decoder.decode(localDictionaryChunkForBlocklet.getDictionary_data(), 0,
           localDictionaryChunkForBlocklet.getDictionary_data().length);
       for (int i = 0; i < 500; i++) {
@@ -111,7 +116,8 @@ public class TestPageLevelDictionary {
   public void testPageLevelDictionaryContainsOnlyUsedDictionaryValuesWhenMultiplePagesUseSameDictionary() {
     LocalDictionaryGenerator generator = new ColumnLocalDictionaryGenerator(1000, 2);
     String columnName = "column1";
-    PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary(generator, columnName, DataTypes.STRING, false);
+    PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary(
+        generator, columnName, DataTypes.STRING, false, compressorName);
     byte[][] validateData = new byte[10][];
     int index = 0;
     try {
@@ -128,7 +134,8 @@ public class TestPageLevelDictionary {
     } catch (DictionaryThresholdReachedException e) {
       Assert.assertTrue(false);
     }
-    PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary(generator, columnName, DataTypes.STRING, false);
+    PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary(
+        generator, columnName, DataTypes.STRING, false, compressorName);
     try {
       for (int i = 1; i <= 5; i++) {
         byte[] data = ("vikas" + i).getBytes();
@@ -174,10 +181,11 @@ public class TestPageLevelDictionary {
       EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();
       List<ByteBuffer> encoderMetas =
           localDictionaryChunkForBlocklet.getDictionary_meta().getEncoder_meta();
-      ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas);
+      ColumnPageDecoder decoder = encodingFactory.createDecoder(
+          encodings, encoderMetas, compressorName);
       ColumnPage decode = decoder.decode(localDictionaryChunkForBlocklet.getDictionary_data(), 0,
           localDictionaryChunkForBlocklet.getDictionary_data().length);
-      BitSet bitSet = BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
+      BitSet bitSet = BitSet.valueOf(CompressorFactory.getInstance().getCompressor(compressorName)
           .unCompressByte(localDictionaryChunkForBlocklet.getDictionary_values()));
       Assert.assertTrue(bitSet.cardinality()==validateData.length);
       for(int i =0; i<validateData.length;i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index c8c74f2..c6b0fcb 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -7,7 +7,7 @@
     the License.  You may obtain a copy of the License at
 
       http://www.apache.org/licenses/LICENSE-2.0
-    
+
     Unless required by applicable law or agreed to in writing, software 
     distributed under the License is distributed on an "AS IS" BASIS, 
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -92,6 +92,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.load.directWriteHdfs.enabled | false | During data load all the carbondata files are written to local disk and finally copied to the target location in HDFS.Enabling this parameter will make carrbondata files to be written directly onto target HDFS location bypassing the local disk.**NOTE:** Writing directly to HDFS saves local disk IO(once for writing the files and again for copying to HDFS) there by improving the performance.But the drawback is when data loading fails or the application crashes, unwanted carbondata files will remain in the target HDFS location until it is cleared during next data load or by running *CLEAN FILES* DDL command |
 | carbon.options.serialization.null.format | \N | Based on the business scenarios, some columns might need to be loaded with null values.As null value cannot be written in csv files, some special characters might be adopted to specify null values.This configuration can be used to specify the null values format in the data being loaded. |
 | carbon.sort.storage.inmemory.size.inmb | 512 | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits.When ***enable.unsafe.sort*** configuration is enabled, instead of using ***carbon.sort.size*** which is based on rows count, size occupied in memory is used to determine when to flush data pages to intermediate temp files.This configuration determines the memory to be used for storing data pages in memory.**NOTE:** Configuring a higher values ensures more data is maintained in memory and hence increases data loading performance due to reduced or no IO.Based on the memory availability in the nodes of the cluster, configure the values accordingly. |
+| carbon.column.compressor | snappy | CarbonData will compress the column values using the compressor specified by this configuration. Currently CarbonData supports 'snappy' and 'zstd' compressors. | |
 
 ## Compaction Configuration
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index a495b6d..2423ffa 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -65,10 +65,12 @@ enum SortState{
 }
 
 /**
- * Compressions supported by CarbonData.
+ * Compressions for column page supported by CarbonData.
  */
 enum CompressionCodec{
     SNAPPY = 0;
+    //** We will not use this CompressionCodec any longer since 1.5.0, but because it is required in some structure, we cannot get rid of it. So here I add another deprecated enum to alert the people who want to use it **//
+    DEPRECATED = 1;
 }
 
 /**
@@ -82,6 +84,8 @@ struct ChunkCompressionMeta{
     2: required i64 total_uncompressed_size;
     /** Total byte size of all compressed pages in this column chunk (including the headers) **/
     3: required i64 total_compressed_size;
+    /** compressor name for chunk, this is introduced in 1.5.0 to make compression for final store more extensible. We will first check compression_codec, if it is not set, we will use this compressor_name **/
+    4: optional string compressor_name;
 }
 
 /**
@@ -212,6 +216,7 @@ struct FileHeader{
 	4: optional i64 time_stamp; // Timestamp to compare column schema against master schema
 	5: optional bool is_splitable; // Whether file is splitable or not
 	6: optional binary sync_marker; // 16 bytes sync marker
+  7: optional string compressor_name;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 2d4f370..28817e9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -299,6 +300,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     model.setTableName(CarbonTableOutputFormat.getTableName(conf));
     model.setCarbonTransactionalTable(true);
     CarbonTable carbonTable = getCarbonTable(conf);
+    String columnCompressor = carbonTable.getTableInfo().getFactTable().getTableProperties().get(
+        CarbonCommonConstants.COMPRESSOR);
+    if (null == columnCompressor) {
+      columnCompressor = CompressorFactory.getInstance().getCompressor().getName();
+    }
+    model.setColumnCompressor(columnCompressor);
     model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable));
     model.setTablePath(getTablePath(conf));
     setFileHeader(conf, model);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 935c52d..7cd241a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
@@ -134,6 +135,12 @@ public class StoreCreator {
       AbsoluteTableIdentifier absoluteTableIdentifier) {
     CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
     CarbonLoadModel loadModel = new CarbonLoadModel();
+    String columnCompressor = table.getTableInfo().getFactTable().getTableProperties().get(
+        CarbonCommonConstants.COMPRESSOR);
+    if (columnCompressor == null) {
+      columnCompressor = CompressorFactory.getInstance().getCompressor().getName();
+    }
+    loadModel.setColumnCompressor(columnCompressor);
     loadModel.setCarbonDataLoadSchema(schema);
     loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
     loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 895d6a5..4b973a1 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -38,6 +38,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier, ReverseDictionary}
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.fileoperations.{AtomicFileOperationFactory, AtomicFileOperations, FileWriteOperation}
 import org.apache.carbondata.core.metadata.converter.{SchemaConverter, ThriftWrapperSchemaConverterImpl}
@@ -83,6 +84,11 @@ object CarbonDataStoreCreator {
       writeDictionary(dataFilePath, table, absoluteTableIdentifier)
       val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)
       val loadModel: CarbonLoadModel = new CarbonLoadModel()
+      import scala.collection.JavaConverters._
+      val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor().getName())
+      loadModel.setColumnCompressor(columnCompressor)
       loadModel.setCarbonDataLoadSchema(schema)
       loadModel.setDatabaseName(
         absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
new file mode 100644
index 0000000..628a0dc
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.concurrent.{ExecutorService, Executors, Future}
+import java.util.Calendar
+
+import scala.util.Random
+
+import org.apache.commons.lang3.{RandomStringUtils, StringUtils}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.InvalidConfigurationException
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+case class Rcd(booleanField: Boolean, shortField: Short, intField: Int, bigintField: Long,
+    doubleField: Double, stringField: String, timestampField: String, decimalField: Double,
+    dateField: String, charField: String, floatField: Float, stringDictField: String,
+    stringSortField: String, stringLocalDictField: String, longStringField: String)
+
+class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  private val tableName = "load_test_with_compressor"
+  private var executorService: ExecutorService = _
+  private val csvDataDir = s"$integrationPath/spark2/target/csv_load_compression"
+
+  override protected def beforeAll(): Unit = {
+    executorService = Executors.newFixedThreadPool(3)
+    CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(csvDataDir))
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  override protected def afterAll(): Unit = {
+    executorService.shutdown()
+    CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(csvDataDir))
+    try {
+      sql(s"DROP TABLE IF EXISTS $tableName")
+    } catch {
+      case _: Exception =>
+    }
+  }
+
+  override protected def afterEach(): Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR,
+      CarbonCommonConstants.DEFAULT_COMPRESSOR)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+      CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)
+
+    try {
+      sql(s"DROP TABLE IF EXISTS $tableName")
+    } catch {
+      case _: Exception =>
+    }
+  }
+
+  private def createTable(streaming: Boolean = false, columnCompressor: String = ""): Unit = {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    sql(
+      s"""
+         | CREATE TABLE $tableName(
+         |    booleanField boolean,
+         |    shortField smallint,
+         |    intField int,
+         |    bigintField bigint,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField timestamp,
+         |    decimalField decimal(18,2),
+         |    dateField date,
+         |    charField string,
+         |    floatField float,
+         |    stringDictField string,
+         |    stringSortField string,
+         |    stringLocalDictField string,
+         |    longStringField string
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         |  ${if (StringUtils.isBlank(columnCompressor)) "" else s"'${CarbonCommonConstants.COMPRESSOR}'='$columnCompressor',"}
+         |  ${if (streaming) "" else s"'LONG_STRING_COLUMNS'='longStringField',"}
+         |  'SORT_COLUMNS'='stringSortField',
+         |  'DICTIONARY_INCLUDE'='stringDictField',
+         |  'local_dictionary_enable'='true',
+         |  'local_dictionary_threshold'='10000',
+         |  'local_dictionary_include'='stringLocalDictField' ${if (streaming) s", 'STREAMING'='true'" else ""})
+       """.stripMargin)
+  }
+
+  private def loadData(): Unit = {
+    sql(
+      s"""
+         | INSERT INTO TABLE $tableName VALUES
+         |  (true,1,11,101,41.4,'string1','2015/4/23 12:01:01',12.34,'2015/4/23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'),
+         | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'),
+         |  (true,3,13,163,43.4,'string3','2015/7/26 12:01:06',34.56,'2015/7/26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'),
+         | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
+       """.stripMargin)
+    sql(
+      s"""
+         | INSERT INTO TABLE $tableName VALUES
+         |  (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 2},${Double.MinValue + 2},'string1','2015/4/23 12:01:01',${Double.MinValue + 2},'2015/4/23','aaa',${Float.MaxValue - 2},'dict1','sort1','local_dict1','longstring1'),
+         | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'),
+         |  (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 2},${Double.MaxValue - 2},'string3','2015/7/26 12:01:06',${Double.MinValue + 2},'2015/7/26','ccc',${Float.MinValue + 2},'dict3','sort3','local_dict3','longstring3'),
+         | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
+       """.stripMargin)
+  }
+
+  test("test data loading with snappy compressor and offheap") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+  }
+
+  test("test data loading with zstd compressor and offheap") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+  }
+
+  test("test data loading with zstd compressor and onheap") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+  }
+
+  test("test current zstd compressor on legacy store with snappy") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    createTable()
+    loadData()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
+  }
+
+  test("test current snappy compressor on legacy store with zstd") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    createTable()
+    loadData()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
+  }
+
+  test("test compaction with different compressor for each load") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    createTable()
+    loadData()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    loadData()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    loadData()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    loadData()
+
+    // there are 8 loads
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8)))
+    assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 8)
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+    sql(s"CLEAN FILES FOR TABLE $tableName")
+    // after compaction and clean, there should be on segment
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8)))
+    assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 1)
+  }
+
+  test("test data loading with unsupported compressor and onheap") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "fake")
+    createTable()
+    val exception = intercept[UnsupportedOperationException] {
+      loadData()
+    }
+    assert(exception.getMessage.contains("Invalid compressor type provided"))
+  }
+
+  test("test compaction with unsupported compressor") {
+    createTable()
+    loadData()
+    loadData()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "fake")
+    val exception = intercept[UnsupportedOperationException] {
+      sql(s"ALTER TABLE $tableName COMPACT 'major'")
+    }
+    assert(exception.getMessage.contains("Invalid compressor type provided"))
+  }
+
+  private def generateAllDataTypeDF(lineNum: Int) = {
+    val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    val calendar = Calendar.getInstance()
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to lineNum)
+      .map { p =>
+        calendar.add(Calendar.HOUR, p)
+        Rcd(Random.nextBoolean(), (Random.nextInt() % Short.MaxValue).toShort, Random.nextInt(), Random.nextLong(),
+          Random.nextDouble(), Random.nextString(6), tsFormat.format(calendar.getTime), 0.01 * p,
+          dateFormat.format(calendar.getTime), s"$p", Random.nextFloat(), s"stringDict$p",
+          s"stringSort$p", s"stringLocalDict$p", RandomStringUtils.randomAlphabetic(33000))
+      }
+      .toDF()
+      .cache()
+  }
+
+  test("test data loading & compaction with more pages and change the compressor during loading") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE, "2000")
+    val lineNum = 5000
+    val df = generateAllDataTypeDF(lineNum)
+
+    def loadDataAsync(): Future[_] = {
+      executorService.submit(new Runnable {
+        override def run(): Unit = {
+          df.write
+            .format("carbondata")
+            .option("tableName", tableName)
+            .mode(SaveMode.Append)
+            .save()
+        }
+      })
+    }
+
+    createTable()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    var future = loadDataAsync()
+    // change the compressor randomly during the loading
+    while (!future.isDone) {
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "zstd")
+    }
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    future = loadDataAsync()
+    while (!future.isDone) {
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "zstd")
+    }
+
+    checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 2)))
+    checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1")))
+
+    def compactAsync(): Future[_] = {
+      executorService.submit(new Runnable {
+        override def run(): Unit = {
+          sql(s"ALTER TABLE $tableName COMPACT 'MAJOR'")
+        }
+      })
+    }
+
+    // change the compressor randomly during compaction
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    future = compactAsync()
+    while (!future.isDone) {
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "zstd")
+    }
+
+    checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 2)))
+    checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1")))
+  }
+
+  test("test creating table with specified compressor") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    // the system configuration for compressor is snappy
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    // create table with zstd as compressor
+    createTable(columnCompressor = "zstd")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+    val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
+    val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
+    assert("zstd".equalsIgnoreCase(tableColumnCompressor))
+  }
+
+  test("test creating table with unsupported compressor") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    // the system configuration for compressor is snappy
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    // create table with unsupported compressor
+    val exception = intercept[InvalidConfigurationException] {
+      createTable (columnCompressor = "fakecompressor")
+    }
+    assert(exception.getMessage.contains("fakecompressor compressor is not supported"))
+  }
+
+  private def generateAllDataTypeFiles(lineNum: Int, csvDir: String,
+      saveMode: SaveMode = SaveMode.Overwrite): Unit = {
+    val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    val calendar = Calendar.getInstance()
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to lineNum)
+      .map { p =>
+        calendar.add(Calendar.HOUR, p)
+        Rcd(Random.nextBoolean(), (Random.nextInt() % Short.MaxValue / 2).toShort, Random.nextInt(), Random.nextLong(),
+          Random.nextDouble(), RandomStringUtils.randomAlphabetic(6), tsFormat.format(calendar.getTime), 0.01 * p,
+          dateFormat.format(calendar.getTime), s"$p", Random.nextFloat(), s"stringDict$p",
+          s"stringSort$p", s"stringLocalDict$p", RandomStringUtils.randomAlphabetic(3))
+      }
+      .toDF()
+      .write
+      .option("header", "false")
+      .mode(saveMode)
+      .csv(csvDir)
+  }
+
+  test("test streaming ingestion with different compressor for each mini-batch") {
+    createTable(streaming = true)
+    val carbonTable = CarbonEnv.getCarbonTable(Some("default"), tableName)(sqlContext.sparkSession)
+    val lineNum = 10
+    val dataLocation = new File(csvDataDir).getCanonicalPath
+
+    def doStreamingIngestionThread(): Thread = {
+      new Thread() {
+        override def run(): Unit = {
+          var streamingQuery: StreamingQuery = null
+          try {
+            val streamingQuery = sqlContext.sparkSession.readStream
+              .text(dataLocation)
+              .writeStream
+              .format("carbondata")
+              .trigger(ProcessingTime(s"1 seconds"))
+              .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+              .option("dbName", "default")
+              .option("tableName", tableName)
+              .option(CarbonStreamParser.CARBON_STREAM_PARSER, CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
+              .start()
+            streamingQuery.awaitTermination()
+          } catch {
+            case ex: Exception => LOGGER.error(ex)
+          } finally {
+            streamingQuery.stop()
+          }
+        }
+      }
+    }
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    generateAllDataTypeFiles(lineNum, dataLocation)
+    val thread = doStreamingIngestionThread()
+    thread.start()
+    Thread.sleep(10 * 1000)
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    generateAllDataTypeFiles(lineNum, dataLocation, SaveMode.Append)
+    Thread.sleep(10 * 1000)
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    generateAllDataTypeFiles(lineNum, dataLocation, SaveMode.Append)
+    Thread.sleep(10 * 1000)
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    generateAllDataTypeFiles(lineNum, dataLocation, SaveMode.Append)
+    Thread.sleep(40 * 1000)
+    thread.interrupt()
+    checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 4)))
+    checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"),
+      Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1")))
+
+    sql(s"alter table $tableName compact 'streaming'")
+
+    checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 4)))
+    checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"),
+      Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1")))
+    try {
+      sql(s"DROP TABLE IF EXISTS $tableName")
+    } catch {
+      case _: Exception =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index a03a5eb..643471c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -51,7 +51,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
 import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverterV3}
+import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, CarbonUtil, DataFileFooterConverterV3}
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.sdk.file._
 
@@ -2604,16 +2604,18 @@ object testUtil{
       data: Array[String]): Boolean = {
     val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary
     if (null != local_dictionary) {
+      val compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        rawColumnPage.getDataChunkV3.getData_chunk_list.get(0).getChunk_meta)
       val encodings = local_dictionary.getDictionary_meta.encoders
       val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta
       val encodingFactory = DefaultEncodingFactory.getInstance
-      val decoder = encodingFactory.createDecoder(encodings, encoderMetas)
+      val decoder = encodingFactory.createDecoder(encodings, encoderMetas, compressorName)
       val dictionaryPage = decoder
         .decode(local_dictionary.getDictionary_data, 0, local_dictionary.getDictionary_data.length)
       val dictionaryMap = new
           util.HashMap[DictionaryByteArrayWrapper, Integer]
       val usedDictionaryValues = util.BitSet
-        .valueOf(CompressorFactory.getInstance.getCompressor
+        .valueOf(CompressorFactory.getInstance.getCompressor(compressorName)
           .unCompressByte(local_dictionary.getDictionary_values))
       var index = 0
       var i = usedDictionaryValues.nextSetBit(0)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
index 59586c0..e88d8a9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
-import org.apache.carbondata.core.util.{CarbonProperties, DataFileFooterConverterV3}
+import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, DataFileFooterConverterV3}
 
 class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterAll {
 
@@ -277,16 +277,18 @@ class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterA
       data: Array[String]): Boolean = {
     val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary
     if (null != local_dictionary) {
+      val compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        rawColumnPage.getDataChunkV3.getData_chunk_list.get(0).getChunk_meta)
       val encodings = local_dictionary.getDictionary_meta.encoders
       val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta
       val encodingFactory = DefaultEncodingFactory.getInstance
-      val decoder = encodingFactory.createDecoder(encodings, encoderMetas)
+      val decoder = encodingFactory.createDecoder(encodings, encoderMetas, compressorName)
       val dictionaryPage = decoder
         .decode(local_dictionary.getDictionary_data, 0, local_dictionary.getDictionary_data.length)
       val dictionaryMap = new
           util.HashMap[DictionaryByteArrayWrapper, Integer]
       val usedDictionaryValues = util.BitSet
-        .valueOf(CompressorFactory.getInstance.getCompressor
+        .valueOf(CompressorFactory.getInstance.getCompressor(compressorName)
           .unCompressByte(local_dictionary.getDictionary_values))
       var index = 0
       var i = usedDictionaryValues.nextSetBit(0)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index cc8a28e..b382693 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sin
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
 import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
@@ -271,6 +272,10 @@ object StreamSinkFactory {
       getConf.get("spark.driver.host")
     carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
     carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt)
+    val columnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    carbonLoadModel.setColumnCompressor(columnCompressor)
     carbonLoadModel
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 57887a7..6350b50 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -51,6 +51,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
@@ -288,6 +289,10 @@ object CarbonDataRDDFactory {
     loadModel.readAndSetLoadMetadataDetails()
     val loadStartTime = CarbonUpdateUtil.readCurrentTime()
     loadModel.setFactTimeStamp(loadStartTime)
+    val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    loadModel.setColumnCompressor(columnCompressor)
     loadModel
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index 747b064..6d69eb5 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.stream;
 
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.HashMap;
@@ -33,6 +32,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -64,7 +64,6 @@ import org.apache.carbondata.hadoop.InputMetricsStats;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.streaming.CarbonStreamInputFormat;
-import org.apache.carbondata.streaming.CarbonStreamUtils;
 import org.apache.carbondata.streaming.StreamBlockletReader;
 
 import org.apache.hadoop.conf.Configuration;
@@ -110,6 +109,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
   private CacheProvider cacheProvider;
   private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
   private GenericQueryType[] queryTypes;
+  private String compressorName;
 
   // vectorized reader
   private StructType outputSchema;
@@ -262,6 +262,12 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
   private byte[] getSyncMarker(String filePath) throws IOException {
     CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
     FileHeader header = headerReader.readHeader();
+    // legacy store does not have this member
+    if (header.isSetCompressor_name()) {
+      compressorName = header.getCompressor_name();
+    } else {
+      compressorName = CompressorFactory.SupportedCompressor.SNAPPY.getName();
+    }
     return header.getSync_marker();
   }
 
@@ -285,7 +291,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     FSDataInputStream fileIn = fs.open(file, bufferSize);
     fileIn.seek(fileSplit.getStart());
     input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
-        fileSplit.getStart() == 0);
+        fileSplit.getStart() == 0, compressorName);
 
     cacheProvider = CacheProvider.getInstance();
     cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index e0b0547..a13dfdc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -34,6 +34,7 @@ import org.apache.spark.util.AlterTableUtil
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -159,6 +160,10 @@ case class CarbonAlterTableCompactionCommand(
       carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
       carbonLoadModel.setDatabaseName(table.getDatabaseName)
       carbonLoadModel.setTablePath(table.getTablePath)
+      val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor.getName)
+      carbonLoadModel.setColumnCompressor(columnCompressor)
 
       var storeLocation = System.getProperty("java.io.tmpdir")
       storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 63da404..f7a5f42 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -51,6 +51,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
 import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
@@ -206,6 +207,10 @@ case class CarbonLoadDataCommand(
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
       carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
+      val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor.getName)
+      carbonLoadModel.setColumnCompressor(columnCompressor)
 
       val javaPartition = mutable.Map[String, String]()
       partition.foreach { case (k, v) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 807c925..6c8b0b0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -123,6 +124,10 @@ case class CarbonAlterTableAddHivePartitionCommand(
             "Schema of index files located in location is not matching with current table schema")
         }
         val loadModel = new CarbonLoadModel
+        val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+          .getOrElse(CarbonCommonConstants.COMPRESSOR,
+            CompressorFactory.getInstance().getCompressor.getName)
+        loadModel.setColumnCompressor(columnCompressor)
         loadModel.setCarbonTransactionalTable(true)
         loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
         // Create new entry in tablestatus file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index cd26fe8..b76a485 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -145,6 +146,10 @@ case class CarbonAlterTableDropPartitionCommand(
       carbonLoadModel.setTablePath(table.getTablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)
+      val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor.getName)
+      carbonLoadModel.setColumnCompressor(columnCompressor)
       alterTableDropPartition(
         sparkSession.sqlContext,
         model.partitionId,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index f4b6de0..753abaf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -33,8 +33,8 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
@@ -142,8 +142,12 @@ case class CarbonAlterTableSplitPartitionCommand(
         LockUsage.ALTER_PARTITION_LOCK)
       locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
         locksToBeAcquired)(sparkSession)
-      val carbonLoadModel = new CarbonLoadModel()
       val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      val carbonLoadModel = new CarbonLoadModel()
+      val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor.getName)
+      carbonLoadModel.setColumnCompressor(columnCompressor)
       val tablePath = table.getTablePath
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 1beda11..42ea0bd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.table
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
@@ -26,6 +27,7 @@ import org.apache.spark.sql.execution.command.MetadataCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -99,6 +101,18 @@ case class CarbonCreateTableCommand(
         throwMetadataException(dbName, tableName, "Table should have at least one column.")
       }
 
+      // Add validatation for column compressor when create table
+      val columnCompressor = tableInfo.getFactTable.getTableProperties.get(
+        CarbonCommonConstants.COMPRESSOR)
+      try {
+        if (null != columnCompressor) {
+          CompressorFactory.getInstance().getCompressor(columnCompressor)
+        }
+      } catch {
+        case ex : UnsupportedOperationException =>
+          throw new InvalidConfigurationException(ex.getMessage)
+      }
+
       val operationContext = new OperationContext
       val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
         CreateTablePreExecutionEvent(sparkSession, tableIdentifier, Some(tableInfo))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 6716707..b605a1d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore
 import org.apache.carbondata.core.metadata.SegmentFileStore
@@ -87,6 +88,11 @@ with Serializable {
     val table = CarbonEnv.getCarbonTable(
       TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
     val model = new CarbonLoadModel
+    val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    model.setColumnCompressor(columnCompressor)
+
     val carbonProperty = CarbonProperties.getInstance()
     val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
     val tableProperties = table.getTableInfo.getFactTable.getTableProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 3298009..08c149b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -70,6 +71,11 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
       FileFactory.mkdirs(metadataDirectoryPath, fileType)
     }
+    import scala.collection.JavaConverters._
+    val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    carbonLoadModel.setColumnCompressor(columnCompressor)
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index d98229a..060afca 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -185,6 +186,11 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
       FileFactory.mkdirs(metadataDirectoryPath, fileType)
     }
+    import scala.collection.JavaConverters._
+    val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    carbonLoadModel.setColumnCompressor(columnCompressor)
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index 7ef86a5..a49d5bb 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
 
     assertResult(2)(result.length)
     assertResult("table_info1")(result(0).getString(0))
-    // 2096 is the size of carbon table
-    assertResult(2147)(result(0).getLong(1))
+    // 2087 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
+    assertResult(2187)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2147)(result(1).getLong(1))
+    assertResult(2187)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 46ad32f..4d85296 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -121,6 +121,11 @@ public class CarbonDataLoadConfiguration {
 
   private String parentTablePath;
 
+  /**
+   * name of compressor to be used to compress column page
+   */
+  private String columnCompressor;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -408,4 +413,11 @@ public class CarbonDataLoadConfiguration {
     return complexNonDictionaryColumnCount;
   }
 
+  public String getColumnCompressor() {
+    return columnCompressor;
+  }
+
+  public void setColumnCompressor(String columnCompressor) {
+    this.columnCompressor = columnCompressor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 225da26..f89bc2f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -314,6 +314,7 @@ public final class DataLoadProcessBuilder {
     if (loadModel.getSdkWriterCores() > 0) {
       configuration.setWritingCoresCount(loadModel.getSdkWriterCores());
     }
+    configuration.setColumnCompressor(loadModel.getColumnCompressor());
     return configuration;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 97e329d..e15fb5d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -229,6 +229,11 @@ public class CarbonLoadModel implements Serializable {
 
   private List<String> mergedSegmentIds;
 
+  /**
+   * compressor used to compress column page
+   */
+  private String columnCompressor;
+
   public boolean isAggLoadRequest() {
     return isAggLoadRequest;
   }
@@ -473,6 +478,7 @@ public class CarbonLoadModel implements Serializable {
     copy.loadMinSize = loadMinSize;
     copy.parentTablePath = parentTablePath;
     copy.sdkWriterCores = sdkWriterCores;
+    copy.columnCompressor = columnCompressor;
     return copy;
   }
 
@@ -529,6 +535,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.loadMinSize = loadMinSize;
     copyObj.parentTablePath = parentTablePath;
     copyObj.sdkWriterCores = sdkWriterCores;
+    copyObj.columnCompressor = columnCompressor;
     return copyObj;
   }
 
@@ -921,4 +928,12 @@ public class CarbonLoadModel implements Serializable {
   public void setSdkWriterCores(short sdkWriterCores) {
     this.sdkWriterCores = sdkWriterCores;
   }
+
+  public String getColumnCompressor() {
+    return columnCompressor;
+  }
+
+  public void setColumnCompressor(String columnCompressor) {
+    this.columnCompressor = columnCompressor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 2ebcb29..bcc904c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -29,7 +29,10 @@ import org.apache.carbondata.common.Strings;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -48,7 +51,8 @@ import org.apache.hadoop.conf.Configuration;
  */
 @InterfaceAudience.Internal
 public class CarbonLoadModelBuilder {
-
+  private static final LogService LOGGER = LogServiceFactory.getLogService(
+      CarbonLoadModelBuilder.class.getName());
   private CarbonTable table;
 
   public CarbonLoadModelBuilder(CarbonTable table) {
@@ -104,6 +108,7 @@ public class CarbonLoadModelBuilder {
     } catch (NumberFormatException e) {
       throw new InvalidLoadOptionException(e.getMessage());
     }
+    validateAndSetColumnCompressor(model);
     return model;
   }
 
@@ -280,6 +285,8 @@ public class CarbonLoadModelBuilder {
     carbonLoadModel.setSortColumnsBoundsStr(optionsFinal.get("sort_column_bounds"));
     carbonLoadModel.setLoadMinSize(
         optionsFinal.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB));
+
+    validateAndSetColumnCompressor(carbonLoadModel);
   }
 
   private int validateMaxColumns(String[] csvHeaders, String maxColumns)
@@ -369,6 +376,23 @@ public class CarbonLoadModelBuilder {
     }
   }
 
+  private void validateAndSetColumnCompressor(CarbonLoadModel carbonLoadModel)
+      throws InvalidLoadOptionException {
+    try {
+      String columnCompressor = carbonLoadModel.getColumnCompressor();
+      if (StringUtils.isBlank(columnCompressor)) {
+        columnCompressor = CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR);
+      }
+      // check and load compressor
+      CompressorFactory.getInstance().getCompressor(columnCompressor);
+      carbonLoadModel.setColumnCompressor(columnCompressor);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new InvalidLoadOptionException("Failed to load the compressor");
+    }
+  }
+
   /**
    * check whether using default value or not
    */


[4/4] carbondata git commit: [CARBONDATA-2851][CARBONDATA-2852] Support zstd as column compressor in final store

Posted by ja...@apache.org.
[CARBONDATA-2851][CARBONDATA-2852] Support zstd as column compressor in final store

1. add zstd compressor for compressing column data
2. add zstd support in thrift
3. since zstd does not support zero-copy while compressing, offheap will
not take effect for zstd
4. support lazy load for compressor
5. Support new compressor on legacy store : In query procedure, we need
to decompress the column page. Previously we get the compressor from
system property. Now since we support new compressors, we should read
the compressor information from the metadata in datafiles.
6. Determine the column compressor before data loading. we will get the
column compressor before data loading/compaction start, so that it can
make all the pages use the same compressor in case of concurrent
modifying compressor during loading.
7. optimize parameters for column page, use columnPageEncodeMeta instead of
its members

Support column compressor in table properties

Support specifying column compressor while creating table in table
properties.

Store compressor name in metadata instead of enum

store compressor name in metadata instead of enum, this will make it
more extensible.

This closes #2628


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8f08c4ab
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8f08c4ab
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8f08c4ab

Branch: refs/heads/master
Commit: 8f08c4abb5a0dc1abd4513613bcb75559ab51761
Parents: 7b31b91
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Tue Sep 11 14:20:12 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Sep 12 17:54:36 2018 +0800

----------------------------------------------------------------------
 .../chunk/impl/DimensionRawColumnChunk.java     |  20 +-
 .../reader/dimension/AbstractChunkReader.java   |   3 +-
 ...mpressedDimensionChunkFileBasedReaderV1.java |   5 +-
 ...mpressedDimensionChunkFileBasedReaderV2.java |   5 +-
 ...mpressedDimensionChunkFileBasedReaderV3.java |  14 +-
 .../measure/AbstractMeasureChunkReader.java     |   2 +
 ...CompressedMeasureChunkFileBasedReaderV1.java |   4 +-
 ...CompressedMeasureChunkFileBasedReaderV2.java |   7 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |  12 +-
 ...essedMsrChunkFileBasedPageLevelReaderV3.java |   7 +-
 .../core/datastore/compression/Compressor.java  |  13 +-
 .../compression/CompressorFactory.java          |  75 +++-
 .../datastore/compression/SnappyCompressor.java |  16 +-
 .../datastore/compression/ZstdCompressor.java   | 170 ++++++++
 .../core/datastore/page/ColumnPage.java         | 280 +++++++------
 .../core/datastore/page/ComplexColumnPage.java  |  16 +-
 .../core/datastore/page/DecimalColumnPage.java  |  61 +--
 .../page/DecoderBasedFallbackEncoder.java       |  15 +-
 .../core/datastore/page/LazyColumnPage.java     |   2 +-
 .../datastore/page/LocalDictColumnPage.java     |   5 +-
 .../datastore/page/SafeDecimalColumnPage.java   |  11 +-
 .../datastore/page/SafeFixLengthColumnPage.java |  40 +-
 .../datastore/page/SafeVarLengthColumnPage.java |  13 +-
 .../datastore/page/UnsafeDecimalColumnPage.java |  38 +-
 .../page/UnsafeFixLengthColumnPage.java         |  96 +++--
 .../page/UnsafeVarLengthColumnPage.java         |  10 +-
 .../datastore/page/VarLengthColumnPageBase.java | 160 +++++---
 .../page/encoding/ColumnPageEncoder.java        |   6 +-
 .../page/encoding/ColumnPageEncoderMeta.java    |  12 +-
 .../page/encoding/DefaultEncodingFactory.java   |  21 +-
 .../page/encoding/EncodingFactory.java          |   9 +-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |   9 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |   9 +-
 .../adaptive/AdaptiveFloatingCodec.java         |   9 +-
 .../adaptive/AdaptiveIntegralCodec.java         |   9 +-
 .../encoding/compress/DirectCompressCodec.java  |  95 ++---
 .../legacy/ComplexDimensionIndexCodec.java      |   8 +-
 .../legacy/DictDimensionIndexCodec.java         |   7 +-
 .../legacy/DirectDictDimensionIndexCodec.java   |   8 +-
 .../legacy/HighCardDictDimensionIndexCodec.java |   7 +-
 .../dimension/legacy/IndexStorageCodec.java     |   5 +-
 .../datastore/page/encoding/rle/RLECodec.java   |  14 +-
 .../page/encoding/rle/RLEEncoderMeta.java       |   4 +-
 .../localdictionary/PageLevelDictionary.java    |  16 +-
 .../core/metadata/datatype/DataType.java        |   1 +
 .../core/scan/executor/util/QueryUtil.java      |   8 +-
 .../core/util/BlockletDataMapUtil.java          |   7 +-
 .../apache/carbondata/core/util/ByteUtil.java   |   2 +
 .../core/util/CarbonMetadataUtil.java           |  26 +-
 .../datastore/page/encoding/RLECodecTest.java   |   8 +-
 .../TestPageLevelDictionary.java                |  24 +-
 docs/configuration-parameters.md                |   3 +-
 format/src/main/thrift/carbondata.thrift        |   7 +-
 .../hadoop/api/CarbonTableOutputFormat.java     |   7 +
 .../hadoop/testutil/StoreCreator.java           |   7 +
 .../presto/util/CarbonDataStoreCreator.scala    |   6 +
 .../dataload/TestLoadDataWithCompression.scala  | 411 +++++++++++++++++++
 .../TestNonTransactionalCarbonTable.scala       |   8 +-
 .../LocalDictionarySupportLoadTableTest.scala   |   8 +-
 .../streaming/StreamSinkFactory.scala           |   5 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   5 +
 .../stream/CarbonStreamRecordReader.java        |  12 +-
 .../CarbonAlterTableCompactionCommand.scala     |   5 +
 .../management/CarbonLoadDataCommand.scala      |   5 +
 ...arbonAlterTableAddHivePartitionCommand.scala |   5 +
 .../CarbonAlterTableDropPartitionCommand.scala  |   5 +
 .../CarbonAlterTableSplitPartitionCommand.scala |   8 +-
 .../table/CarbonCreateTableCommand.scala        |  14 +
 .../datasources/SparkCarbonTableFormat.scala    |   6 +
 .../spark/util/AllDictionaryTestCase.scala      |   6 +
 .../util/ExternalColumnDictionaryTestCase.scala |   6 +
 .../CarbonGetTableDetailComandTestCase.scala    |   6 +-
 .../loading/CarbonDataLoadConfiguration.java    |  12 +
 .../loading/DataLoadProcessBuilder.java         |   1 +
 .../loading/model/CarbonLoadModel.java          |  15 +
 .../loading/model/CarbonLoadModelBuilder.java   |  26 +-
 .../store/CarbonFactDataHandlerModel.java       |  11 +
 .../carbondata/processing/store/TablePage.java  |  32 +-
 .../streaming/CarbonStreamRecordWriter.java     |  37 +-
 .../streaming/StreamBlockletReader.java         |   5 +-
 .../streaming/StreamBlockletWriter.java         |   5 +-
 81 files changed, 1570 insertions(+), 543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index d645e08..8791cea 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
+import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -33,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.format.Encoding;
 import org.apache.carbondata.format.LocalDictionaryChunk;
 
@@ -144,7 +146,11 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
     if (null != getDataChunkV3() && null != getDataChunkV3().local_dictionary
         && null == localDictionary) {
       try {
-        localDictionary = getDictionary(getDataChunkV3().local_dictionary);
+        String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+            getDataChunkV3().data_chunk_list.get(0).chunk_meta);
+
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(compressorName);
+        localDictionary = getDictionary(getDataChunkV3().local_dictionary, compressor);
       } catch (IOException | MemoryException e) {
         throw new RuntimeException(e);
       }
@@ -160,17 +166,17 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
    * @throws IOException
    * @throws MemoryException
    */
-  private CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk)
-      throws IOException, MemoryException {
+  private CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk,
+      Compressor compressor) throws IOException, MemoryException {
     if (null != localDictionaryChunk) {
       List<Encoding> encodings = localDictionaryChunk.getDictionary_meta().getEncoders();
       List<ByteBuffer> encoderMetas = localDictionaryChunk.getDictionary_meta().getEncoder_meta();
-      ColumnPageDecoder decoder =
-          DefaultEncodingFactory.getInstance().createDecoder(encodings, encoderMetas);
+      ColumnPageDecoder decoder = DefaultEncodingFactory.getInstance().createDecoder(
+          encodings, encoderMetas, compressor.getName());
       ColumnPage decode = decoder.decode(localDictionaryChunk.getDictionary_data(), 0,
           localDictionaryChunk.getDictionary_data().length);
-      BitSet usedDictionary = BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
-          .unCompressByte(localDictionaryChunk.getDictionary_values()));
+      BitSet usedDictionary = BitSet.valueOf(compressor.unCompressByte(
+          localDictionaryChunk.getDictionary_values()));
       int length = usedDictionary.length();
       int index = 0;
       byte[][] dictionary = new byte[length][];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
index 28e8741..b08f9ed 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
 import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.util.CarbonProperties;
 
@@ -32,7 +31,7 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader
   /**
    * compressor will be used to uncompress the data
    */
-  protected static final Compressor COMPRESSOR = CompressorFactory.getInstance().getCompressor();
+  protected Compressor compressor;
 
   /**
    * size of the each column value

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
index 8256b7e..3df7efb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionCo
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -55,6 +56,8 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead
       final int[] eachColumnValueSize, final String filePath) {
     super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
     this.dimensionColumnChunk = blockletInfo.getDimensionColumnChunk();
+    // for v1 store, the compressor is snappy
+    this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
   }
 
   /**
@@ -108,7 +111,7 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead
     FileReader fileReader = dimensionRawColumnChunk.getFileReader();
 
     ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
-    dataPage = COMPRESSOR.unCompressByte(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(),
+    dataPage = compressor.unCompressByte(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(),
         dimensionRawColumnChunk.getLength());
 
     // if row id block is present then read the row id chunk and uncompress it

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index a44d710..7d00fa4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionCo
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
@@ -47,6 +48,8 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
   public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
       final int[] eachColumnValueSize, final String filePath) {
     super(blockletInfo, eachColumnValueSize, filePath);
+    // for v2 store, the compressor is snappy
+    this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
   }
 
   /**
@@ -143,7 +146,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     }
 
     // first read the data and uncompressed it
-    dataPage = COMPRESSOR
+    dataPage = compressor
         .unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length);
     copySourcePoint += dimensionColumnChunk.data_page_length;
     // if row id block is present then read the row id chunk and uncompress it

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 8a2b74e..dc0f171 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunk
 import org.apache.carbondata.core.datastore.chunk.store.ColumnPageWrapper;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
@@ -37,6 +38,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
@@ -200,6 +202,9 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
     // get the data buffer
     ByteBuffer rawData = rawColumnPage.getRawData();
     DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
+    String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        pageMetadata.getChunk_meta());
+    this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
     // calculating the start point of data
     // as buffer can contain multiple column data, start point will be datachunkoffset +
     // data chunk length + page offset
@@ -214,7 +219,10 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       throws IOException, MemoryException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
-    ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas);
+    String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        pageMetadata.getChunk_meta());
+    ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
+        compressorName);
     return decoder
         .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
   }
@@ -242,7 +250,7 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
     if (isEncodedWithMeta(pageMetadata)) {
       ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
           null != rawColumnPage.getLocalDictionary());
-      decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
+      decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
       return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(),
           isEncodedWithAdaptiveMeta(pageMetadata));
     } else {
@@ -273,7 +281,7 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
     int[] rlePage;
     int[] invertedIndexes = new int[0];
     int[] invertedIndexesReverse = new int[0];
-    dataPage = COMPRESSOR.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length);
+    dataPage = compressor.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length);
     offset += pageMetadata.data_page_length;
     // if row id block is present then read the row id chunk and uncompress it
     if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
index d781cea..6774fcb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.datastore.chunk.reader.measure;
 
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 
@@ -24,6 +25,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
  * Measure block reader abstract class
  */
 public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkReader {
+  protected Compressor compressor;
 
   protected EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index f0c1b75..e1bcdc0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -96,7 +97,8 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
     int blockIndex = measureRawColumnChunk.getColumnIndex();
     DataChunk dataChunk = measureColumnChunks.get(blockIndex);
     ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
-    ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta);
+    ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
+        CompressorFactory.SupportedCompressor.SNAPPY.getName());
     ColumnPage decodedPage = codec.decode(measureRawColumnChunk.getRawData().array(),
         (int) measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
     decodedPage.setNullBits(dataChunk.getNullValueIndexForColumn());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 9864ab8..86083cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -46,6 +47,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
   public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
       final String filePath) {
     super(blockletInfo, filePath);
+    this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
   }
 
   @Override
@@ -126,7 +128,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     copyPoint += measureColumnChunkLength.get(blockIndex);
 
     ColumnPage page = decodeMeasure(measureRawColumnChunk, measureColumnChunk, copyPoint);
-    page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence));
+    page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence, this.compressor));
     return page;
   }
 
@@ -137,7 +139,8 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     byte[] encodedMeta = encoder_meta.get(0).array();
 
     ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV2(encodedMeta);
-    ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta);
+    ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
+        CompressorFactory.SupportedCompressor.SNAPPY.getName());
     byte[] rawData = measureRawColumnChunk.getRawData().array();
     return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index e389ac6..240771a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -23,11 +23,13 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
@@ -192,6 +194,9 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
     DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
     // data chunk of page
     DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
+    String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        pageMetadata.getChunk_meta());
+    this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
     // calculating the start point of data
     // as buffer can contain multiple column data, start point will be datachunkoffset +
     // data chunk length + page offset
@@ -199,7 +204,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
         measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
         dataChunk3.getPage_offset().get(pageNumber);
     ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset);
-    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
+    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
     return decodedPage;
   }
 
@@ -210,7 +215,10 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
       throws MemoryException, IOException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
-    ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas);
+    String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        pageMetadata.getChunk_meta());
+    ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas,
+        compressorName);
     return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
index 052f745..924a206 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
@@ -22,10 +22,12 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
@@ -138,6 +140,9 @@ public class CompressedMsrChunkFileBasedPageLevelReaderV3
     DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
     // data chunk of page
     DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber);
+    String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        pageMetadata.getChunk_meta());
+    this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
     // calculating the start point of data
     // as buffer can contain multiple column data, start point will be datachunkoffset +
     // data chunk length + page offset
@@ -147,7 +152,7 @@ public class CompressedMsrChunkFileBasedPageLevelReaderV3
         .readByteBuffer(filePath, offset, pageMetadata.data_page_length);
 
     ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0);
-    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
+    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
     return decodedPage;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
index 5c2a5fb..282e12c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
@@ -33,7 +33,7 @@ public interface Compressor {
 
   byte[] compressShort(short[] unCompInput);
 
-  short[] unCompressShort(byte[] compInput, int offset, int lenght);
+  short[] unCompressShort(byte[] compInput, int offset, int length);
 
   byte[] compressInt(int[] unCompInput);
 
@@ -55,5 +55,14 @@ public interface Compressor {
 
   long rawUncompress(byte[] input, byte[] output) throws IOException;
 
-  int maxCompressedLength(int inputSize);
+  long maxCompressedLength(long inputSize);
+
+  /**
+   * Whether this compressor support zero-copy during compression.
+   * Zero-copy means that the compressor support receiving memory address (pointer)
+   * and returning result in memory address (pointer).
+   * Currently not all java version of the compressors support this feature.
+   * @return true if it supports, otherwise return false
+   */
+  boolean supportUnsafe();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
index 18f6252..40459b1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
@@ -17,25 +17,53 @@
 
 package org.apache.carbondata.core.datastore.compression;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 public class CompressorFactory {
-
   private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory();
 
-  private final Compressor snappyCompressor;
+  private final Map<String, SupportedCompressor> compressors = new HashMap<>();
+
+  public enum SupportedCompressor {
+    SNAPPY("snappy", SnappyCompressor.class),
+    ZSTD("zstd", ZstdCompressor.class);
+
+    private String name;
+    private Class<Compressor> compressorClass;
+    private transient Compressor compressor;
+
+    SupportedCompressor(String name, Class compressorCls) {
+      this.name = name;
+      this.compressorClass = compressorCls;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * we will load the compressor only if it is needed
+     */
+    public Compressor getCompressor() {
+      if (this.compressor == null) {
+        try {
+          this.compressor = compressorClass.newInstance();
+        } catch (InstantiationException | IllegalAccessException e) {
+          throw new RuntimeException("Exception occurs while getting compressor for " + name);
+        }
+      }
+      return this.compressor;
+    }
+  }
 
   private CompressorFactory() {
-    String compressorType = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR);
-    switch (compressorType) {
-      case "snappy":
-        snappyCompressor = new SnappyCompressor();
-        break;
-      default:
-        throw new RuntimeException(
-            "Invalid compressor type provided! Please provide valid compressor type");
+    for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) {
+      compressors.put(supportedCompressor.getName(), supportedCompressor);
     }
   }
 
@@ -43,16 +71,29 @@ public class CompressorFactory {
     return COMPRESSOR_FACTORY;
   }
 
+  /**
+   * get the default compressor.
+   * This method can only be called in data load procedure to compress column page.
+   * In query procedure, we should read the compressor information from the metadata
+   * in datafiles when we want to decompress the content.
+   */
   public Compressor getCompressor() {
-    return getCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR);
+    String compressorType = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR);
+    if (!compressors.containsKey(compressorType)) {
+      throw new UnsupportedOperationException(
+          "Invalid compressor type provided! Currently we only support "
+              + Arrays.toString(SupportedCompressor.values()));
+    }
+    return getCompressor(compressorType);
   }
 
   public Compressor getCompressor(String name) {
-    if (name.equalsIgnoreCase("snappy")) {
-      return snappyCompressor;
-    } else {
-      throw new UnsupportedOperationException(name + " compressor is not supported");
+    if (compressors.containsKey(name.toLowerCase())) {
+      return compressors.get(name.toLowerCase()).getCompressor();
     }
+    throw new UnsupportedOperationException(
+        name + " compressor is not supported, currently we only support "
+            + Arrays.toString(SupportedCompressor.values()));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
index bd740b2..15f912a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
@@ -34,7 +34,7 @@ public class SnappyCompressor implements Compressor {
   // snappy estimate max compressed length as 32 + source_len + source_len/6
   public static final int MAX_BYTE_TO_COMPRESS = (int)((Integer.MAX_VALUE - 32) / 7.0 * 6);
 
-  private final SnappyNative snappyNative;
+  private final transient SnappyNative snappyNative;
 
   public SnappyCompressor() {
     Snappy snappy = new Snappy();
@@ -107,9 +107,9 @@ public class SnappyCompressor implements Compressor {
     }
   }
 
-  @Override public short[] unCompressShort(byte[] compInput, int offset, int lenght) {
+  @Override public short[] unCompressShort(byte[] compInput, int offset, int length) {
     try {
-      return Snappy.uncompressShortArray(compInput, offset, lenght);
+      return Snappy.uncompressShortArray(compInput, offset, length);
     } catch (IOException e) {
       LOGGER.error(e, e.getMessage());
       throw new RuntimeException(e);
@@ -196,12 +196,18 @@ public class SnappyCompressor implements Compressor {
     return snappyNative.rawCompress(inputAddress, inputSize, outputAddress);
   }
 
+  @Override
   public long rawUncompress(byte[] input, byte[] output) throws IOException {
     return snappyNative.rawUncompress(input, 0, input.length, output, 0);
   }
 
   @Override
-  public int maxCompressedLength(int inputSize) {
-    return snappyNative.maxCompressedLength(inputSize);
+  public long maxCompressedLength(long inputSize) {
+    return snappyNative.maxCompressedLength((int) inputSize);
+  }
+
+  @Override
+  public boolean supportUnsafe() {
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
new file mode 100644
index 0000000..914c3e7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.DoubleBuffer;
+import java.nio.FloatBuffer;
+import java.nio.IntBuffer;
+import java.nio.LongBuffer;
+import java.nio.ShortBuffer;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+import com.github.luben.zstd.Zstd;
+
+public class ZstdCompressor implements Compressor {
+  private static final int COMPRESS_LEVEL = 3;
+
+  public ZstdCompressor() {
+  }
+
+  @Override
+  public String getName() {
+    return "zstd";
+  }
+
+  @Override
+  public byte[] compressByte(byte[] unCompInput) {
+    return Zstd.compress(unCompInput, COMPRESS_LEVEL);
+  }
+
+  @Override
+  public byte[] compressByte(byte[] unCompInput, int byteSize) {
+    return Zstd.compress(unCompInput, COMPRESS_LEVEL);
+  }
+
+  @Override
+  public byte[] unCompressByte(byte[] compInput) {
+    long decompressedSize = Zstd.decompressedSize(compInput);
+    return Zstd.decompress(compInput, (int) decompressedSize);
+  }
+
+  @Override
+  public byte[] unCompressByte(byte[] compInput, int offset, int length) {
+    // todo: how to avoid memory copy
+    byte[] dstBytes = new byte[length];
+    System.arraycopy(compInput, offset, dstBytes, 0, length);
+    return unCompressByte(dstBytes);
+  }
+
+  @Override
+  public byte[] compressShort(short[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_SHORT);
+    unCompBuffer.asShortBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public short[] unCompressShort(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    ShortBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asShortBuffer();
+    short[] shorts = new short[unCompArray.length / ByteUtil.SIZEOF_SHORT];
+    unCompBuffer.get(shorts);
+    return shorts;
+  }
+
+  @Override
+  public byte[] compressInt(int[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_INT);
+    unCompBuffer.asIntBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public int[] unCompressInt(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    IntBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asIntBuffer();
+    int[] ints = new int[unCompArray.length / ByteUtil.SIZEOF_INT];
+    unCompBuffer.get(ints);
+    return ints;
+  }
+
+  @Override
+  public byte[] compressLong(long[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_LONG);
+    unCompBuffer.asLongBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public long[] unCompressLong(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    LongBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asLongBuffer();
+    long[] longs = new long[unCompArray.length / ByteUtil.SIZEOF_LONG];
+    unCompBuffer.get(longs);
+    return longs;
+  }
+
+  @Override
+  public byte[] compressFloat(float[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_FLOAT);
+    unCompBuffer.asFloatBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public float[] unCompressFloat(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    FloatBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asFloatBuffer();
+    float[] floats = new float[unCompArray.length / ByteUtil.SIZEOF_FLOAT];
+    unCompBuffer.get(floats);
+    return floats;
+  }
+
+  @Override
+  public byte[] compressDouble(double[] unCompInput) {
+    ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_DOUBLE);
+    unCompBuffer.asDoubleBuffer().put(unCompInput);
+    return compressByte(unCompBuffer.array());
+  }
+
+  @Override
+  public double[] unCompressDouble(byte[] compInput, int offset, int length) {
+    byte[] unCompArray = unCompressByte(compInput, offset, length);
+    DoubleBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asDoubleBuffer();
+    double[] doubles = new double[unCompArray.length / ByteUtil.SIZEOF_DOUBLE];
+    unCompBuffer.get(doubles);
+    return doubles;
+  }
+
+  @Override
+  public long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException {
+    throw new RuntimeException("Not implemented rawCompress for zstd yet");
+  }
+
+  @Override
+  public long rawUncompress(byte[] input, byte[] output) throws IOException {
+    return Zstd.decompress(output, input);
+  }
+
+  @Override
+  public long maxCompressedLength(long inputSize) {
+    return Zstd.compressBound(inputSize);
+  }
+
+  /**
+   * currently java version of zstd does not support this feature.
+   * It may support it in upcoming release 1.3.5-3, then we can optimize this accordingly.
+   */
+  @Override
+  public boolean supportUnsafe() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 5b42735..796083d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -51,11 +51,7 @@ public abstract class ColumnPage {
   // number of row in this page
   protected int pageSize;
 
-  // data type of the page storage
-  protected final DataType dataType;
-
-  // specification of this column
-  private final TableSpec.ColumnSpec columnSpec;
+  protected ColumnPageEncoderMeta columnPageEncoderMeta;
 
   // The index of the rowId whose value is null, will be set to 1
   protected BitSet nullBitSet;
@@ -70,15 +66,14 @@ public abstract class ColumnPage {
   /**
    * Create a new column page with input data type and page size.
    */
-  protected ColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    this.columnSpec = columnSpec;
-    this.dataType = dataType;
+  protected ColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
     this.pageSize = pageSize;
     this.nullBitSet = new BitSet(pageSize);
+    this.columnPageEncoderMeta = columnPageEncoderMeta;
   }
 
   public DataType getDataType() {
-    return dataType;
+    return columnPageEncoderMeta.getStoreDataType();
   }
 
   public SimpleStatsResult getStatistics() {
@@ -93,102 +88,112 @@ public abstract class ColumnPage {
     this.statsCollector = statsCollector;
   }
 
-  private static ColumnPage createDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+  private static ColumnPage createDecimalPage(ColumnPageEncoderMeta columnPageEncoderMeta,
       int pageSize) {
     if (unsafe) {
       try {
-        return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
+        return new UnsafeDecimalColumnPage(columnPageEncoderMeta, pageSize);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }
     } else {
-      return new SafeDecimalColumnPage(columnSpec, dataType, pageSize);
+      return new SafeDecimalColumnPage(columnPageEncoderMeta, pageSize);
     }
   }
 
-  private static ColumnPage createVarLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+  private static ColumnPage createVarLengthPage(ColumnPageEncoderMeta columnPageEncoderMeta,
       int pageSize) {
     if (unsafe) {
       try {
-        return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize);
+        return new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }
     } else {
-      return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize);
+      return new SafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
     }
   }
 
-  private static ColumnPage createFixLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
-      int pageSize) {
+  private static ColumnPage createFixLengthPage(
+      ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
     if (unsafe) {
       try {
-        return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
+        return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }
     } else {
-      return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize);
+      return new SafeFixLengthColumnPage(columnPageEncoderMeta, pageSize);
     }
   }
 
-  private static ColumnPage createFixLengthByteArrayPage(TableSpec.ColumnSpec columnSpec,
-      DataType dataType, int pageSize, int eachValueSize) {
+  private static ColumnPage createFixLengthByteArrayPage(
+      ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int eachValueSize) {
     if (unsafe) {
       try {
-        return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize);
+        return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize, eachValueSize);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }
     } else {
-      return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize);
+      return new SafeFixLengthColumnPage(columnPageEncoderMeta, pageSize);
     }
   }
 
-  private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
-      int pageSize) {
-    if (DataTypes.isDecimal(dataType)) {
-      return createDecimalPage(columnSpec, dataType, pageSize);
-    } else if (dataType.equals(BYTE_ARRAY)) {
-      return createVarLengthPage(columnSpec, dataType, pageSize);
+  private static ColumnPage createPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType())) {
+      return createDecimalPage(columnPageEncoderMeta, pageSize);
+    } else if (columnPageEncoderMeta.getStoreDataType().equals(BYTE_ARRAY)) {
+      return createVarLengthPage(columnPageEncoderMeta, pageSize);
     } else {
-      return createFixLengthPage(columnSpec, dataType, pageSize);
+      return createFixLengthPage(columnPageEncoderMeta, pageSize);
     }
   }
 
-  public static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
-      int pageSize)
-    throws MemoryException {
-    return newPage(columnSpec, dataType, pageSize);
+  public static ColumnPage newDecimalPage(ColumnPageEncoderMeta columnPageEncoderMeta,
+      int pageSize) throws MemoryException {
+    return newPage(columnPageEncoderMeta, pageSize);
   }
 
-  public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+  public static ColumnPage newLocalDictPage(ColumnPageEncoderMeta columnPageEncoderMeta,
       int pageSize, LocalDictionaryGenerator localDictionaryGenerator,
       boolean isComplexTypePrimitive) throws MemoryException {
     boolean isDecoderBasedFallBackEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK,
             CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK_DEFAULT));
+    ColumnPage actualPage;
+    ColumnPage encodedPage;
     if (unsafe) {
-      return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize),
-          new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize,
-              CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE), localDictionaryGenerator,
-          isComplexTypePrimitive, isDecoderBasedFallBackEnabled);
+      actualPage = new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
+      encodedPage = new UnsafeFixLengthColumnPage(
+          new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), DataTypes.BYTE_ARRAY,
+              columnPageEncoderMeta.getCompressorName()),
+          pageSize,
+          CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE);
     } else {
-      return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize),
-          new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize),
-          localDictionaryGenerator, isComplexTypePrimitive, isDecoderBasedFallBackEnabled);
+      actualPage = new SafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
+      encodedPage = new SafeFixLengthColumnPage(
+          new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), DataTypes.BYTE_ARRAY,
+              columnPageEncoderMeta.getCompressorName()),
+          pageSize);
     }
+    return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator,
+        isComplexTypePrimitive, isDecoderBasedFallBackEnabled);
   }
 
   /**
    * Create a new page of dataType and number of row = pageSize
    */
-  public static ColumnPage newPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
-      int pageSize) throws MemoryException {
+  public static ColumnPage newPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
+      throws MemoryException {
     ColumnPage instance;
+    DataType dataType = columnPageEncoderMeta.getStoreDataType();
+    TableSpec.ColumnSpec columnSpec = columnPageEncoderMeta.getColumnSpec();
+    String compressorName = columnPageEncoderMeta.getCompressorName();
     if (unsafe) {
       if (dataType == DataTypes.BOOLEAN) {
-        instance = new UnsafeFixLengthColumnPage(columnSpec, BYTE, pageSize);
+        instance = new UnsafeFixLengthColumnPage(
+            new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), pageSize);
       } else if (dataType == DataTypes.BYTE ||
           dataType == DataTypes.SHORT ||
           dataType == DataTypes.SHORT_INT ||
@@ -196,39 +201,43 @@ public abstract class ColumnPage {
           dataType == DataTypes.LONG ||
           dataType == DataTypes.FLOAT ||
           dataType == DataTypes.DOUBLE) {
-        instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
+        instance = new UnsafeFixLengthColumnPage(
+            new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize);
       } else if (dataType == DataTypes.TIMESTAMP) {
-        instance = new UnsafeFixLengthColumnPage(columnSpec, DataTypes.LONG, pageSize);
+        instance = new UnsafeFixLengthColumnPage(
+            new ColumnPageEncoderMeta(columnSpec, DataTypes.LONG, compressorName), pageSize);
       } else if (DataTypes.isDecimal(dataType)) {
-        instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
+        instance = new UnsafeDecimalColumnPage(
+            new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize);
       } else if (dataType == DataTypes.STRING
           || dataType == DataTypes.BYTE_ARRAY
           || dataType == DataTypes.VARCHAR) {
-        instance = new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize);
+        instance = new UnsafeVarLengthColumnPage(
+            new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize);
       } else {
         throw new RuntimeException("Unsupported data dataType: " + dataType);
       }
     } else {
       if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
-        instance = newBytePage(columnSpec, new byte[pageSize]);
+        instance = newBytePage(columnSpec, new byte[pageSize], compressorName);
       } else if (dataType == DataTypes.SHORT) {
-        instance = newShortPage(columnSpec, new short[pageSize]);
+        instance = newShortPage(columnSpec, new short[pageSize], compressorName);
       } else if (dataType == DataTypes.SHORT_INT) {
-        instance = newShortIntPage(columnSpec, new byte[pageSize * 3]);
+        instance = newShortIntPage(columnSpec, new byte[pageSize * 3], compressorName);
       } else if (dataType == DataTypes.INT) {
-        instance = newIntPage(columnSpec, new int[pageSize]);
+        instance = newIntPage(columnSpec, new int[pageSize], compressorName);
       } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
-        instance = newLongPage(columnSpec, new long[pageSize]);
+        instance = newLongPage(columnSpec, new long[pageSize], compressorName);
       } else if (dataType == DataTypes.FLOAT) {
-        instance = newFloatPage(columnSpec, new float[pageSize]);
+        instance = newFloatPage(columnSpec, new float[pageSize], compressorName);
       } else if (dataType == DataTypes.DOUBLE) {
-        instance = newDoublePage(columnSpec, new double[pageSize]);
+        instance = newDoublePage(columnSpec, new double[pageSize], compressorName);
       } else if (DataTypes.isDecimal(dataType)) {
-        instance = newDecimalPage(columnSpec, new byte[pageSize][]);
+        instance = newDecimalPage(columnSpec, new byte[pageSize][], compressorName);
       } else if (dataType == DataTypes.STRING
           || dataType == DataTypes.BYTE_ARRAY
           || dataType == DataTypes.VARCHAR) {
-        instance = new SafeVarLengthColumnPage(columnSpec, dataType, pageSize);
+        instance = new SafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
       } else {
         throw new RuntimeException("Unsupported data dataType: " + dataType);
       }
@@ -236,83 +245,103 @@ public abstract class ColumnPage {
     return instance;
   }
 
-  public static ColumnPage wrapByteArrayPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray) {
-    ColumnPage columnPage = createPage(columnSpec, BYTE_ARRAY, byteArray.length);
+  public static ColumnPage wrapByteArrayPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray,
+      String compressorName) {
+    ColumnPage columnPage = createPage(
+        new ColumnPageEncoderMeta(columnSpec, BYTE_ARRAY, compressorName), byteArray.length);
     columnPage.setByteArrayPage(byteArray);
     return columnPage;
   }
 
-  private static ColumnPage newBytePage(TableSpec.ColumnSpec columnSpec, byte[] byteData) {
-    ColumnPage columnPage = createPage(columnSpec, BYTE, byteData.length);
+  private static ColumnPage newBytePage(TableSpec.ColumnSpec columnSpec, byte[] byteData,
+      String compressorName) {
+    ColumnPage columnPage = createPage(
+        new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), byteData.length);
     columnPage.setBytePage(byteData);
     return columnPage;
   }
 
-  private static ColumnPage newShortPage(TableSpec.ColumnSpec columnSpec, short[] shortData) {
-    ColumnPage columnPage = createPage(columnSpec, SHORT, shortData.length);
+  private static ColumnPage newShortPage(TableSpec.ColumnSpec columnSpec, short[] shortData,
+      String compressorName) {
+    ColumnPage columnPage = createPage(
+        new ColumnPageEncoderMeta(columnSpec, SHORT, compressorName), shortData.length);
     columnPage.setShortPage(shortData);
     return columnPage;
   }
 
-  private static ColumnPage newShortIntPage(TableSpec.ColumnSpec columnSpec, byte[] shortIntData) {
-    ColumnPage columnPage = createPage(columnSpec, SHORT_INT, shortIntData.length / 3);
+  private static ColumnPage newShortIntPage(TableSpec.ColumnSpec columnSpec, byte[] shortIntData,
+      String compressorName) {
+    ColumnPage columnPage = createPage(
+        new ColumnPageEncoderMeta(columnSpec, SHORT_INT, compressorName), shortIntData.length / 3);
     columnPage.setShortIntPage(shortIntData);
     return columnPage;
   }
 
-  private static ColumnPage newIntPage(TableSpec.ColumnSpec columnSpec, int[] intData) {
-    ColumnPage columnPage = createPage(columnSpec, INT, intData.length);
+  private static ColumnPage newIntPage(TableSpec.ColumnSpec columnSpec, int[] intData,
+      String compressorName) {
+    ColumnPage columnPage = createPage(
+        new ColumnPageEncoderMeta(columnSpec, INT, compressorName), intData.length);
     columnPage.setIntPage(intData);
     return columnPage;
   }
 
-  private static ColumnPage newLongPage(TableSpec.ColumnSpec columnSpec, long[] longData) {
-    ColumnPage columnPage = createPage(columnSpec, LONG, longData.length);
+  private static ColumnPage newLongPage(TableSpec.ColumnSpec columnSpec, long[] longData,
+      String compressorName) {
+    ColumnPage columnPage = createPage(
+        new ColumnPageEncoderMeta(columnSpec, LONG, compressorName), longData.length);
     columnPage.setLongPage(longData);
     return columnPage;
   }
 
-  private static ColumnPage newFloatPage(TableSpec.ColumnSpec columnSpec, float[] floatData) {
-    ColumnPage columnPage = createPage(columnSpec, FLOAT, floatData.length);
+  private static ColumnPage newFloatPage(TableSpec.ColumnSpec columnSpec, float[] floatData,
+      String compressorName) {
+    ColumnPage columnPage = createPage(
+        new ColumnPageEncoderMeta(columnSpec, FLOAT, compressorName), floatData.length);
     columnPage.setFloatPage(floatData);
     return columnPage;
   }
 
-  private static ColumnPage newDoublePage(TableSpec.ColumnSpec columnSpec, double[] doubleData) {
-    ColumnPage columnPage = createPage(columnSpec, DOUBLE, doubleData.length);
+  private static ColumnPage newDoublePage(TableSpec.ColumnSpec columnSpec, double[] doubleData,
+      String compressorName) {
+    ColumnPage columnPage = createPage(
+        new ColumnPageEncoderMeta(columnSpec, DOUBLE, compressorName), doubleData.length);
     columnPage.setDoublePage(doubleData);
     return columnPage;
   }
 
-  private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray) {
-    ColumnPage columnPage =
-        createPage(columnSpec, columnSpec.getSchemaDataType(), byteArray.length);
+  private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray,
+      String compressorName) {
+    ColumnPage columnPage = createPage(
+        new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+        byteArray.length);
     columnPage.setByteArrayPage(byteArray);
     return columnPage;
   }
 
   private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedByteArray) throws MemoryException {
-    return VarLengthColumnPageBase.newDecimalColumnPage(columnSpec, lvEncodedByteArray);
+      byte[] lvEncodedByteArray, String compressorName) throws MemoryException {
+    return VarLengthColumnPageBase.newDecimalColumnPage(
+        columnSpec, lvEncodedByteArray, compressorName);
   }
 
   private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedByteArray, int lvLength) throws MemoryException {
-    return VarLengthColumnPageBase.newLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength);
+      byte[] lvEncodedByteArray, int lvLength, String compressorName) throws MemoryException {
+    return VarLengthColumnPageBase.newLVBytesColumnPage(
+        columnSpec, lvEncodedByteArray, lvLength, compressorName);
   }
 
   private static ColumnPage newComplexLVBytesPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedByteArray, int lvLength) throws MemoryException {
-    return VarLengthColumnPageBase
-        .newComplexLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength);
+      byte[] lvEncodedByteArray, int lvLength, String compressorName) throws MemoryException {
+    return VarLengthColumnPageBase.newComplexLVBytesColumnPage(
+        columnSpec, lvEncodedByteArray, lvLength, compressorName);
   }
 
   private static ColumnPage newFixedByteArrayPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedByteArray, int eachValueSize) throws MemoryException {
+      byte[] lvEncodedByteArray, int eachValueSize, String compressorName) throws MemoryException {
     int pageSize = lvEncodedByteArray.length / eachValueSize;
-    ColumnPage fixLengthByteArrayPage =
-        createFixLengthByteArrayPage(columnSpec, columnSpec.getSchemaDataType(), pageSize,
-            eachValueSize);
+    ColumnPage fixLengthByteArrayPage = createFixLengthByteArrayPage(
+        new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+        pageSize, eachValueSize);
     byte[] data = null;
     int offset = 0;
     for (int i = 0; i < pageSize; i++) {
@@ -379,8 +408,9 @@ public abstract class ColumnPage {
       nullBitSet.set(rowId);
       return;
     }
+    DataType dataType = columnPageEncoderMeta.getStoreDataType();
     if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
-      if (columnSpec.getSchemaDataType() == DataTypes.BOOLEAN) {
+      if (columnPageEncoderMeta.getColumnSpec().getSchemaDataType() == DataTypes.BOOLEAN) {
         value = BooleanConvert.boolean2Byte((Boolean) value);
       }
       putByte(rowId, (byte) value);
@@ -419,9 +449,10 @@ public abstract class ColumnPage {
     if (nullBitSet.get(rowId)) {
       return getNull(rowId);
     }
+    DataType dataType = columnPageEncoderMeta.getStoreDataType();
     if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
       byte value = getByte(rowId);
-      if (columnSpec.getSchemaDataType() == DataTypes.BOOLEAN) {
+      if (columnPageEncoderMeta.getColumnSpec().getSchemaDataType() == DataTypes.BOOLEAN) {
         return BooleanConvert.byte2Boolean(value);
       }
       return value;
@@ -501,6 +532,7 @@ public abstract class ColumnPage {
    * Set null at rowId
    */
   protected void putNull(int rowId) {
+    DataType dataType = columnPageEncoderMeta.getStoreDataType();
     if (dataType == DataTypes.BOOLEAN) {
       putBoolean(rowId, false);
     } else if (dataType == DataTypes.BYTE) {
@@ -525,11 +557,12 @@ public abstract class ColumnPage {
    */
   private Object getNull(int rowId) {
     Object result;
+    DataType dataType = columnPageEncoderMeta.getStoreDataType();
     if (dataType == DataTypes.BOOLEAN) {
       result = getBoolean(rowId);
     } else if (dataType == DataTypes.BYTE) {
       result = getByte(rowId);
-      if (columnSpec.getSchemaDataType() == DataTypes.BOOLEAN) {
+      if (columnPageEncoderMeta.getColumnSpec().getSchemaDataType() == DataTypes.BOOLEAN) {
         result = BooleanConvert.byte2Boolean((byte)result);
       }
     } else if (dataType == DataTypes.SHORT) {
@@ -679,10 +712,12 @@ public abstract class ColumnPage {
   public PageLevelDictionary getPageDictionary() {
     throw new UnsupportedOperationException("Operation Not Supported");
   }
+
   /**
    * Compress page data using specified compressor
    */
   public byte[] compress(Compressor compressor) throws MemoryException, IOException {
+    DataType dataType = columnPageEncoderMeta.getStoreDataType();
     if (dataType == DataTypes.BOOLEAN) {
       return compressor.compressByte(getBooleanPage());
     } else if (dataType == DataTypes.BYTE) {
@@ -702,13 +737,13 @@ public abstract class ColumnPage {
     } else if (DataTypes.isDecimal(dataType)) {
       return compressor.compressByte(getDecimalPage());
     } else if (dataType == DataTypes.BYTE_ARRAY
-        && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+        && columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
       return compressor.compressByte(getComplexChildrenLVFlattenedBytePage());
-    } else if (dataType == DataTypes.BYTE_ARRAY && (
-        columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT
-            || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY
-            || columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE
-            || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
+    } else if (dataType == DataTypes.BYTE_ARRAY
+        && (columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_STRUCT
+        || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_ARRAY
+        || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.PLAIN_LONG_VALUE
+        || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.PLAIN_VALUE)) {
       return compressor.compressByte(getComplexParentFlattenedBytePage());
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return compressor.compressByte(getLVFlattenedBytePage());
@@ -729,51 +764,54 @@ public abstract class ColumnPage {
     DataType storeDataType = meta.getStoreDataType();
     if (storeDataType == DataTypes.BOOLEAN || storeDataType == DataTypes.BYTE) {
       byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
-      return newBytePage(columnSpec, byteData);
+      return newBytePage(columnSpec, byteData, meta.getCompressorName());
     } else if (storeDataType == DataTypes.SHORT) {
       short[] shortData = compressor.unCompressShort(compressedData, offset, length);
-      return newShortPage(columnSpec, shortData);
+      return newShortPage(columnSpec, shortData, meta.getCompressorName());
     } else if (storeDataType == DataTypes.SHORT_INT) {
       byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
-      return newShortIntPage(columnSpec, shortIntData);
+      return newShortIntPage(columnSpec, shortIntData, meta.getCompressorName());
     } else if (storeDataType == DataTypes.INT) {
       int[] intData = compressor.unCompressInt(compressedData, offset, length);
-      return newIntPage(columnSpec, intData);
+      return newIntPage(columnSpec, intData, meta.getCompressorName());
     } else if (storeDataType == DataTypes.LONG) {
       long[] longData = compressor.unCompressLong(compressedData, offset, length);
-      return newLongPage(columnSpec, longData);
+      return newLongPage(columnSpec, longData, meta.getCompressorName());
     } else if (storeDataType == DataTypes.FLOAT) {
       float[] floatData = compressor.unCompressFloat(compressedData, offset, length);
-      return newFloatPage(columnSpec, floatData);
+      return newFloatPage(columnSpec, floatData, meta.getCompressorName());
     } else if (storeDataType == DataTypes.DOUBLE) {
       double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
-      return newDoublePage(columnSpec, doubleData);
+      return newDoublePage(columnSpec, doubleData, meta.getCompressorName());
     } else if (!isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY && (
         columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE
             || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
       return newComplexLVBytesPage(columnSpec, lvVarBytes,
-          CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+          CarbonCommonConstants.SHORT_SIZE_IN_BYTE, meta.getCompressorName());
     } else if (isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY &&
         columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
-      return newFixedByteArrayPage(columnSpec, lvVarBytes, 3);
+      return newFixedByteArrayPage(columnSpec, lvVarBytes, 3, meta.getCompressorName());
     } else if (storeDataType == DataTypes.BYTE_ARRAY
         && columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
       return newFixedByteArrayPage(columnSpec, lvVarBytes,
-          CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+          CarbonCommonConstants.SHORT_SIZE_IN_BYTE, meta.getCompressorName());
     } else if (storeDataType == DataTypes.BYTE_ARRAY
         && columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
-      return newFixedByteArrayPage(columnSpec, lvVarBytes, CarbonCommonConstants.LONG_SIZE_IN_BYTE);
+      return newFixedByteArrayPage(columnSpec, lvVarBytes,
+          CarbonCommonConstants.LONG_SIZE_IN_BYTE, meta.getCompressorName());
     } else if (storeDataType == DataTypes.BYTE_ARRAY
         && columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
-      return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      return newLVBytesPage(columnSpec, lvVarBytes,
+          CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName());
     } else if (storeDataType == DataTypes.BYTE_ARRAY) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
-      return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      return newLVBytesPage(columnSpec, lvVarBytes,
+          CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName());
     } else {
       throw new UnsupportedOperationException(
           "unsupport uncompress column page: " + meta.getStoreDataType());
@@ -791,32 +829,32 @@ public abstract class ColumnPage {
     DataType storeDataType = meta.getStoreDataType();
     if (storeDataType == DataTypes.BYTE) {
       byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
-      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), byteData.length);
+      decimalPage = createDecimalPage(meta, byteData.length);
       decimalPage.setBytePage(byteData);
       return decimalPage;
     } else if (storeDataType == DataTypes.SHORT) {
       short[] shortData = compressor.unCompressShort(compressedData, offset, length);
-      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortData.length);
+      decimalPage = createDecimalPage(meta, shortData.length);
       decimalPage.setShortPage(shortData);
       return decimalPage;
     } else if (storeDataType == DataTypes.SHORT_INT) {
       byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
-      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortIntData.length);
+      decimalPage = createDecimalPage(meta, shortIntData.length);
       decimalPage.setShortIntPage(shortIntData);
       return decimalPage;
     }  else if (storeDataType == DataTypes.INT) {
       int[] intData = compressor.unCompressInt(compressedData, offset, length);
-      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), intData.length);
+      decimalPage = createDecimalPage(meta, intData.length);
       decimalPage.setIntPage(intData);
       return decimalPage;
     } else if (storeDataType == DataTypes.LONG) {
       long[] longData = compressor.unCompressLong(compressedData, offset, length);
-      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), longData.length);
+      decimalPage = createDecimalPage(meta, longData.length);
       decimalPage.setLongPage(longData);
       return decimalPage;
     } else {
       byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
-      return newDecimalPage(columnSpec, lvEncodedBytes);
+      return newDecimalPage(columnSpec, lvEncodedBytes, meta.getCompressorName());
     }
   }
 
@@ -829,7 +867,7 @@ public abstract class ColumnPage {
   }
 
   public TableSpec.ColumnSpec getColumnSpec() {
-    return columnSpec;
+    return columnPageEncoderMeta.getColumnSpec();
   }
 
   public boolean isLocalDictGeneratedPage() {
@@ -847,4 +885,12 @@ public abstract class ColumnPage {
   public int getActualRowCount() {
     throw new UnsupportedOperationException("Operation not supported");
   }
+
+  public String getColumnCompressorName() {
+    return columnPageEncoderMeta.getCompressorName();
+  }
+
+  public ColumnPageEncoderMeta getColumnPageEncoderMeta() {
+    return columnPageEncoderMeta;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index a7f94e2..921ae50 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.datastore.row.ComplexColumnInfo;
@@ -71,8 +72,8 @@ public class ComplexColumnPage {
    * @throws MemoryException
    * if memory is not sufficient
    */
-  public void initialize(Map<String, LocalDictionaryGenerator> columnToDictMap, int pageSize)
-      throws MemoryException {
+  public void initialize(Map<String, LocalDictionaryGenerator> columnToDictMap, int pageSize,
+      String columnCompressor) throws MemoryException {
     DataType dataType;
     for (int i = 0; i < this.columnPages.length; i++) {
       LocalDictionaryGenerator localDictionaryGenerator =
@@ -83,15 +84,18 @@ public class ComplexColumnPage {
         if (isColumnPageBasedOnDataType(i)) {
           // no dictionary primitive types need adaptive encoding,
           // hence store as actual value instead of byte array
-          this.columnPages[i] = ColumnPage.newPage(spec, dataType, pageSize);
+          this.columnPages[i] = ColumnPage.newPage(
+              new ColumnPageEncoderMeta(spec, dataType, columnCompressor), pageSize);
           this.columnPages[i].setStatsCollector(PrimitivePageStatsCollector.newInstance(dataType));
         } else {
-          this.columnPages[i] = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize);
+          this.columnPages[i] = ColumnPage.newPage(
+              new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, columnCompressor), pageSize);
           this.columnPages[i].setStatsCollector(new DummyStatsCollector());
         }
       } else {
-        this.columnPages[i] = ColumnPage
-            .newLocalDictPage(spec, DataTypes.BYTE_ARRAY, pageSize, localDictionaryGenerator, true);
+        this.columnPages[i] = ColumnPage.newLocalDictPage(
+            new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, columnCompressor), pageSize,
+            localDictionaryGenerator, true);
         this.columnPages[i].setStatsCollector(new DummyStatsCollector());
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
index 368a289..e63614f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
@@ -19,8 +19,7 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 
@@ -34,10 +33,11 @@ public abstract class DecimalColumnPage extends VarLengthColumnPageBase {
    */
   DecimalConverterFactory.DecimalConverter decimalConverter;
 
-  DecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    super(columnSpec, dataType, pageSize);
-    decimalConverter = DecimalConverterFactory.INSTANCE
-        .getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale());
+  DecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
+    decimalConverter = DecimalConverterFactory.INSTANCE.getDecimalConverter(
+        columnPageEncoderMeta.getColumnSpec().getPrecision(),
+        columnPageEncoderMeta.getColumnSpec().getScale());
   }
 
   public DecimalConverterFactory.DecimalConverter getDecimalConverter() {
@@ -46,67 +46,80 @@ public abstract class DecimalColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public byte[] getBytePage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public short[] getShortPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public byte[] getShortIntPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public int[] getIntPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public long[] getLongPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public float[] getFloatPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public double[] getDoublePage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public byte[][] getByteArrayPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public float getFloat(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public double getDouble(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putDouble(int rowId, double value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setFloatPage(float[] floatData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setDoublePage(double[] doubleData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   // used for building datamap in loading process
@@ -127,15 +140,15 @@ public abstract class DecimalColumnPage extends VarLengthColumnPageBase {
 
   private BigDecimal getDecimalFromDecompressData(int rowId) {
     long value;
-    if (dataType == DataTypes.BYTE) {
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
       value = getByte(rowId);
-    } else if (dataType == DataTypes.SHORT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
       value = getShort(rowId);
-    } else if (dataType == DataTypes.SHORT_INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
       value = getShortInt(rowId);
-    } else if (dataType == DataTypes.INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
       value = getInt(rowId);
-    } else if (dataType == DataTypes.LONG) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
       value = getLong(rowId);
     } else {
       return decimalConverter.getDecimal(getBytes(rowId));


[3/4] carbondata git commit: [CARBONDATA-2851][CARBONDATA-2852] Support zstd as column compressor in final store

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
index d85d6cd..9bed89f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.Encoding;
 
@@ -65,7 +64,8 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu
     int[] rlePage;
 
     // uncompress the encoded column page
-    byte[] bytes = CompressorFactory.getInstance().getCompressor()
+    byte[] bytes = CompressorFactory.getInstance().getCompressor(
+        encodedColumnPage.getActualPage().getColumnPageEncoderMeta().getCompressorName())
         .unCompressByte(encodedColumnPage.getEncodedData().array(), offset,
             encodedColumnPage.getPageMetadata().data_page_length);
 
@@ -94,15 +94,10 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu
     // disable encoding using local dictionary
     encodedColumnPage.getActualPage().disableLocalDictEncoding();
 
-    // get column spec for existing column page
-    TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
-
-    // get the dataType of column
-    DataType dataType = encodedColumnPage.getActualPage().getDataType();
-
     // create a new column page which will have actual data instead of encoded data
     ColumnPage actualDataColumnPage =
-        ColumnPage.newPage(columnSpec, dataType, encodedColumnPage.getActualPage().getPageSize());
+        ColumnPage.newPage(encodedColumnPage.getActualPage().getColumnPageEncoderMeta(),
+            encodedColumnPage.getActualPage().getPageSize());
 
     // uncompressed data from encoded column page is dictionary data, get the dictionary data using
     // keygenerator
@@ -120,6 +115,8 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu
           .putBytes(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(keyArray));
     }
 
+    // get column spec for existing column page
+    TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
     FallbackEncodedColumnPage fallBackEncodedColumnPage =
         CarbonUtil.getFallBackEncodedColumnPage(actualDataColumnPage, pageIndex, columnSpec);
     // here freeing the memory of new column page created as fallback is done and

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 255e078..605fe4e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -37,7 +37,7 @@ public class LazyColumnPage extends ColumnPage {
   private ColumnPageValueConverter converter;
 
   private LazyColumnPage(ColumnPage columnPage, ColumnPageValueConverter converter) {
-    super(columnPage.getColumnSpec(), columnPage.getDataType(), columnPage.getPageSize());
+    super(columnPage.getColumnPageEncoderMeta(), columnPage.getPageSize());
     this.columnPage = columnPage;
     this.converter = converter;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
index 904d7ef..fced016 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
@@ -76,14 +76,13 @@ public class LocalDictColumnPage extends ColumnPage {
   protected LocalDictColumnPage(ColumnPage actualDataColumnPage, ColumnPage encodedColumnpage,
       LocalDictionaryGenerator localDictionaryGenerator, boolean isComplexTypePrimitive,
       boolean isDecoderBasedFallBackEnabled) {
-    super(actualDataColumnPage.getColumnSpec(), actualDataColumnPage.getDataType(),
-        actualDataColumnPage.getPageSize());
+    super(actualDataColumnPage.getColumnPageEncoderMeta(), actualDataColumnPage.getPageSize());
     // if threshold is not reached then create page level dictionary
     // for encoding with local dictionary
     if (!localDictionaryGenerator.isThresholdReached()) {
       pageLevelDictionary = new PageLevelDictionary(localDictionaryGenerator,
           actualDataColumnPage.getColumnSpec().getFieldName(), actualDataColumnPage.getDataType(),
-          isComplexTypePrimitive);
+          isComplexTypePrimitive, actualDataColumnPage.getColumnCompressorName());
       this.encodedDataColumnPage = encodedColumnpage;
       this.keyGenerator = KeyGeneratorFactory
           .getKeyGenerator(new int[] { CarbonCommonConstants.LOCAL_DICTIONARY_MAX + 1 });

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
index 89ac4a4..d3e945d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -19,8 +19,7 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -36,8 +35,8 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
   private byte[] shortIntData;
   private byte[][] byteArrayData;
 
-  SafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    super(columnSpec, dataType, pageSize);
+  SafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
     byteArrayData = new byte[pageSize][];
   }
 
@@ -189,8 +188,8 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
         }
         break;
       default:
-        throw new UnsupportedOperationException(
-            "not support value conversion on " + dataType + " page");
+        throw new UnsupportedOperationException("not support value conversion on "
+            + columnPageEncoderMeta.getStoreDataType() + " page");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 82f1510..b355220 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -22,7 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -45,8 +45,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   // total number of entries in array
   private int arrayElementCount = 0;
 
-  SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    super(columnSpec, dataType, pageSize);
+  SafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
     this.fixedLengthdata = new byte[pageSize][];
   }
 
@@ -120,17 +120,20 @@ public class SafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public void putBytes(int rowId, byte[] bytes, int offset, int length) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putDecimal(int rowId, BigDecimal decimal) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public byte[] getDecimalPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   /**
@@ -190,7 +193,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override public BigDecimal getDecimal(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -267,7 +271,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public byte[] getLVFlattenedBytePage() throws IOException {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -345,7 +350,8 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void setByteArrayPage(byte[][] byteArray) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -366,33 +372,33 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void convertValue(ColumnPageValueConverter codec) {
-    if (dataType == DataTypes.BYTE) {
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, byteData[i]);
       }
-    } else if (dataType == DataTypes.SHORT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, shortData[i]);
       }
-    } else if (dataType == DataTypes.INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, intData[i]);
       }
-    } else if (dataType == DataTypes.LONG) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, longData[i]);
       }
-    } else if (dataType == DataTypes.FLOAT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, floatData[i]);
       }
-    } else if (dataType == DataTypes.DOUBLE) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
       for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, doubleData[i]);
       }
     } else {
-      throw new UnsupportedOperationException("not support value conversion on " +
-          dataType + " page");
+      throw new UnsupportedOperationException("not support value conversion on "
+          + columnPageEncoderMeta.getStoreDataType() + " page");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index 274b8a7..9b47e86 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -24,16 +24,15 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 
 public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   // for string and decimal data
   private List<byte[]> byteArrayData;
 
-  SafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    super(columnSpec, dataType, pageSize);
+  SafeVarLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
     byteArrayData = new ArrayList<>();
   }
 
@@ -54,12 +53,14 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
index 96aeac2..829fad4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
@@ -19,11 +19,10 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 
@@ -32,36 +31,35 @@ import org.apache.carbondata.core.util.ByteUtil;
  */
 public class UnsafeDecimalColumnPage extends DecimalColumnPage {
 
-  UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+  UnsafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
       throws MemoryException {
-    super(columnSpec, dataType, pageSize);
-    capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
-    initMemory();
+    this(columnPageEncoderMeta, pageSize, (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR));
   }
 
-  UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
-      int capacity) throws MemoryException {
-    super(columnSpec, dataType, pageSize);
+  UnsafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int capacity)
+      throws MemoryException {
+    super(columnPageEncoderMeta, pageSize);
     this.capacity = capacity;
     initMemory();
   }
 
   private void initMemory() throws MemoryException {
-    if (dataType == DataTypes.BYTE ||
-        dataType == DataTypes.SHORT ||
-        dataType == DataTypes.INT ||
-        dataType == DataTypes.LONG) {
-      int size = pageSize << dataType.getSizeBits();
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.INT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
+      int size = pageSize << columnPageEncoderMeta.getStoreDataType().getSizeBits();
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-    } else if (dataType == DataTypes.SHORT_INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
       int size = pageSize * 3;
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-    } else if (DataTypes.isDecimal(dataType)) {
+    } else if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType())) {
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
-    } else if (dataType == DataTypes.BYTE_ARRAY) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
     } else {
-      throw new UnsupportedOperationException("invalid data type: " + dataType);
+      throw new UnsupportedOperationException(
+          "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
     }
     baseAddress = memoryBlock.getBaseObject();
     baseOffset = memoryBlock.getBaseOffset();
@@ -255,8 +253,8 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
         }
         break;
       default:
-        throw new UnsupportedOperationException(
-            "not support value conversion on " + dataType + " page");
+        throw new UnsupportedOperationException("not support value conversion on "
+            + columnPageEncoderMeta.getStoreDataType() + " page");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index f75deb6..8a53840 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -20,13 +20,12 @@ package org.apache.carbondata.core.datastore.page;
 import java.io.IOException;
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
@@ -61,40 +60,41 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   private static final int floatBits = DataTypes.FLOAT.getSizeBits();
   private static final int doubleBits = DataTypes.DOUBLE.getSizeBits();
 
-  UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+  UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
       throws MemoryException {
-    super(columnSpec, dataType, pageSize);
-    if (dataType == DataTypes.BOOLEAN ||
-        dataType == DataTypes.BYTE ||
-        dataType == DataTypes.SHORT ||
-        dataType == DataTypes.INT ||
-        dataType == DataTypes.LONG ||
-        dataType == DataTypes.FLOAT ||
-        dataType == DataTypes.DOUBLE) {
-      int size = pageSize << dataType.getSizeBits();
+    super(columnPageEncoderMeta, pageSize);
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BOOLEAN ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.INT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
+      int size = pageSize << columnPageEncoderMeta.getStoreDataType().getSizeBits();
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
       baseAddress = memoryBlock.getBaseObject();
       baseOffset = memoryBlock.getBaseOffset();
       capacity = size;
-    } else if (dataType == DataTypes.SHORT_INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
       int size = pageSize * 3;
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
       baseAddress = memoryBlock.getBaseObject();
       baseOffset = memoryBlock.getBaseOffset();
       capacity = size;
-    } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.STRING) {
-      throw new UnsupportedOperationException("invalid data type: " + dataType);
+    } else if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType()) ||
+        columnPageEncoderMeta.getStoreDataType() == DataTypes.STRING) {
+      throw new UnsupportedOperationException(
+          "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
     }
     totalLength = 0;
   }
 
-  UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
-      int eachRowSize)
-      throws MemoryException {
-    this(columnSpec, dataType, pageSize);
+  UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize,
+      int eachRowSize) throws MemoryException {
+    this(columnPageEncoderMeta, pageSize);
     this.eachRowSize = eachRowSize;
     totalLength = 0;
-    if (dataType == DataTypes.BYTE_ARRAY) {
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
       memoryBlock =
           UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) pageSize * eachRowSize);
       baseAddress = memoryBlock.getBaseObject();
@@ -217,11 +217,13 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public void putBytes(int rowId, byte[] bytes, int offset, int length) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -272,7 +274,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -288,7 +291,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override public byte[] getDecimalPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -375,7 +379,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public byte[] getLVFlattenedBytePage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override public byte[] getComplexChildrenLVFlattenedBytePage() {
@@ -441,7 +446,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public void setByteArrayPage(byte[][] byteArray) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   public void freeMemory() {
@@ -455,68 +461,70 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override public void convertValue(ColumnPageValueConverter codec) {
     int endLoop = getEndLoop();
-    if (dataType == DataTypes.BYTE) {
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << byteBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.SHORT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << shortBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << intBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.LONG) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << longBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.FLOAT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << floatBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset));
       }
-    } else if (dataType == DataTypes.DOUBLE) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
       for (long i = 0; i < endLoop; i++) {
         long offset = i << doubleBits;
         codec.encode((int) i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset));
       }
     } else {
-      throw new UnsupportedOperationException("invalid data type: " + dataType);
+      throw new UnsupportedOperationException(
+          "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
     }
   }
 
   private int getEndLoop() {
-    if (dataType == DataTypes.BYTE) {
+    if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) {
       return totalLength / ByteUtil.SIZEOF_BYTE;
-    } else if (dataType == DataTypes.SHORT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) {
       return totalLength / ByteUtil.SIZEOF_SHORT;
-    } else if (dataType == DataTypes.SHORT_INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) {
       return totalLength / ByteUtil.SIZEOF_SHORT_INT;
-    } else if (dataType == DataTypes.INT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) {
       return totalLength / ByteUtil.SIZEOF_INT;
-    } else if (dataType == DataTypes.LONG) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) {
       return totalLength / ByteUtil.SIZEOF_LONG;
-    } else if (dataType == DataTypes.FLOAT) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) {
       return totalLength / DataTypes.FLOAT.getSizeInBytes();
-    } else if (dataType == DataTypes.DOUBLE) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) {
       return totalLength / DataTypes.DOUBLE.getSizeInBytes();
-    } else if (dataType == DataTypes.BYTE_ARRAY) {
+    } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) {
       return totalLength / eachRowSize;
     } else {
-      throw new UnsupportedOperationException("invalid data type: " + dataType);
+      throw new UnsupportedOperationException(
+          "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
     }
   }
 
   @Override public byte[] compress(Compressor compressor) throws MemoryException, IOException {
-    if (UnsafeMemoryManager.isOffHeap()) {
+    if (UnsafeMemoryManager.isOffHeap() && compressor.supportUnsafe()) {
       // use raw compression and copy to byte[]
       int inputSize = totalLength;
-      int compressedMaxSize = compressor.maxCompressedLength(inputSize);
+      long compressedMaxSize = compressor.maxCompressedLength(inputSize);
       MemoryBlock compressed =
           UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize);
       long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index ae57dcd..4693dba 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -19,11 +19,10 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * This extension uses unsafe memory to store page data, for variable length data type (string)
@@ -33,9 +32,9 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
   /**
    * create a page
    */
-  UnsafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+  UnsafeVarLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize)
       throws MemoryException {
-    super(columnSpec, dataType, pageSize);
+    super(columnPageEncoderMeta, pageSize);
     capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
     memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
     baseAddress = memoryBlock.getBaseObject();
@@ -85,7 +84,8 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 4edd201..7f0b2a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -64,13 +65,14 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   // size of the allocated memory, in bytes
   int capacity;
 
-  VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
-    super(columnSpec, dataType, pageSize);
-    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
-        .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
+  VarLengthColumnPageBase(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
+    super(columnPageEncoderMeta, pageSize);
+    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance(
+        columnPageEncoderMeta.getColumnSpec().getFieldName(), DataTypes.INT, ColumnType.MEASURE);
     try {
-      rowOffset =
-          ColumnPage.newPage(spec, DataTypes.INT, pageSize);
+      rowOffset = ColumnPage.newPage(
+          new ColumnPageEncoderMeta(spec, DataTypes.INT, columnPageEncoderMeta.getCompressorName()),
+          pageSize);
     } catch (MemoryException e) {
       throw new RuntimeException(e);
     }
@@ -79,44 +81,51 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   @Override
   public void setBytePage(byte[] byteData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setShortPage(short[] shortData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setShortIntPage(byte[] shortIntData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setIntPage(int[] intData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setLongPage(long[] longData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setFloatPage(float[] floatData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void setDoublePage(double[] doubleData) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   /**
    * Create a new column page for decimal page
    */
-  static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes)
-      throws MemoryException {
+  static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
+      String compressorName) throws MemoryException {
     DecimalConverterFactory.DecimalConverter decimalConverter =
         DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(),
             columnSpec.getScale());
@@ -124,10 +133,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     if (size < 0) {
       return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
           DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
-          CarbonCommonConstants.INT_SIZE_IN_BYTE);
+          CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName);
     } else {
       // Here the size is always fixed.
-      return getDecimalColumnPage(columnSpec, lvEncodedBytes, size);
+      return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName);
     }
   }
 
@@ -135,23 +144,26 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
    * Create a new column page based on the LV (Length Value) encoded bytes
    */
   static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
-      int lvLength) throws MemoryException {
-    return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
+      int lvLength, String compressorName) throws MemoryException {
+    return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY,
+        lvLength, compressorName);
   }
 
   /**
    * Create a new column page based on the LV (Length Value) encoded bytes
    */
   static ColumnPage newComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, int lvLength) throws MemoryException {
-    return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
+      byte[] lvEncodedBytes, int lvLength, String compressorName) throws MemoryException {
+    return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY,
+        lvLength, compressorName);
   }
 
   private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, int size) throws MemoryException {
+      byte[] lvEncodedBytes, int size, String compressorName) throws MemoryException {
     TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
         .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
-    ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT,
+    ColumnPage rowOffset = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
         CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
     int offset;
     int rowId = 0;
@@ -165,9 +177,13 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
     VarLengthColumnPageBase page;
     if (unsafe) {
-      page = new UnsafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId);
+      page = new UnsafeDecimalColumnPage(
+          new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+          rowId);
     } else {
-      page = new SafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId);
+      page = new SafeDecimalColumnPage(
+          new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+          rowId);
     }
 
     // set total length and rowOffset in page
@@ -181,13 +197,14 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   }
 
   private static ColumnPage getLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, DataType dataType, int lvLength)
+      byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName)
       throws MemoryException {
     // extract length and data, set them to rowOffset and unsafe memory correspondingly
     int rowId = 0;
     TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
         .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
-    ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT,
+    ColumnPage rowOffset = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
         CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
     int length;
     int offset;
@@ -202,20 +219,19 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
       counter++;
     }
     rowOffset.putInt(counter, offset);
-    VarLengthColumnPageBase page =
-        getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
-            offset);
-    return page;
+    return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType,
+        lvLength, rowId, rowOffset, offset, compressorName);
   }
 
   private static ColumnPage getComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, DataType dataType, int lvLength)
+      byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName)
       throws MemoryException {
     // extract length and data, set them to rowOffset and unsafe memory correspondingly
     int rowId = 0;
     TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
         .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
-    ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT,
+    ColumnPage rowOffset = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName),
         CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
     int length;
     int offset;
@@ -231,15 +247,13 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     }
     rowOffset.putInt(counter, offset);
 
-    VarLengthColumnPageBase page =
-        getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
-             offset);
-    return page;
+    return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType,
+        lvLength, rowId, rowOffset, offset, compressorName);
   }
 
   private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSpec columnSpec,
       byte[] lvEncodedBytes, DataType dataType, int lvLength, int rowId, ColumnPage rowOffset,
-      int offset) throws MemoryException {
+      int offset, String compressorName) throws MemoryException {
     int lvEncodedOffset;
     int length;
     int numRows = rowId;
@@ -247,9 +261,12 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     VarLengthColumnPageBase page;
     int inputDataLength = offset;
     if (unsafe) {
-      page = new UnsafeDecimalColumnPage(columnSpec, dataType, numRows, inputDataLength);
+      page = new UnsafeDecimalColumnPage(
+          new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows,
+          inputDataLength);
     } else {
-      page = new SafeDecimalColumnPage(columnSpec, dataType, numRows);
+      page = new SafeDecimalColumnPage(
+          new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows);
     }
 
     // set total length and rowOffset in page
@@ -269,32 +286,38 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   @Override
   public void putByte(int rowId, byte value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putShort(int rowId, short value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putShortInt(int rowId, int value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putInt(int rowId, int value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putLong(int rowId, long value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public void putDouble(int rowId, double value) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   abstract void putBytesAtRow(int rowId, byte[] bytes);
@@ -317,72 +340,86 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   @Override
   public byte getByte(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public short getShort(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public int getShortInt(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public int getInt(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public long getLong(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public float getFloat(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public double getDouble(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public byte[] getBytePage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public short[] getShortPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public byte[] getShortIntPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public int[] getIntPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public long[] getLongPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public float[] getFloatPage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
   public double[] getDoublePage() {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   @Override
@@ -445,7 +482,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   @Override
   public void convertValue(ColumnPageValueConverter codec) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index b5a63f8..2ed12a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -84,7 +84,8 @@ public abstract class ColumnPageEncoder {
   }
 
   private void fillBasicFields(ColumnPage inputPage, DataChunk2 dataChunk) {
-    dataChunk.setChunk_meta(CarbonMetadataUtil.getSnappyChunkCompressionMeta());
+    dataChunk.setChunk_meta(
+        CarbonMetadataUtil.getChunkCompressorMeta(inputPage.getColumnCompressorName()));
     dataChunk.setNumberOfRowsInpage(inputPage.getPageSize());
     dataChunk.setRowMajor(false);
   }
@@ -92,7 +93,8 @@ public abstract class ColumnPageEncoder {
   private void fillNullBitSet(ColumnPage inputPage, DataChunk2 dataChunk) {
     PresenceMeta presenceMeta = new PresenceMeta();
     presenceMeta.setPresent_bit_streamIsSet(true);
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    Compressor compressor = CompressorFactory.getInstance().getCompressor(
+        inputPage.getColumnCompressorName());
     presenceMeta.setPresent_bit_stream(
         compressor.compressByte(inputPage.getNullBits().toByteArray()));
     dataChunk.setPresence(presenceMeta);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 4e04186..971cf24 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -45,14 +45,15 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   // storage data type of this column, it could be different from data type in the column spec
   private DataType storeDataType;
 
-  // compressor name for compressing and decompressing this column
-  private String compressorName;
+  // compressor name for compressing and decompressing this column.
+  // Make it protected for RLEEncoderMeta
+  protected String compressorName;
 
   public ColumnPageEncoderMeta() {
   }
 
   public ColumnPageEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType storeDataType,
-      SimpleStatsResult stats, String compressorName) {
+      String compressorName) {
     if (columnSpec == null) {
       throw new IllegalArgumentException("columm spec must not be null");
     }
@@ -66,6 +67,11 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
     this.storeDataType = storeDataType;
     this.compressorName = compressorName;
     setType(DataType.convertType(storeDataType));
+  }
+
+  public ColumnPageEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType storeDataType,
+      SimpleStatsResult stats, String compressorName) {
+    this(columnSpec, storeDataType, compressorName);
     if (stats != null) {
       setDecimal(stats.getDecimalCount());
       setMaxValue(stats.getMax());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 1cc2ba8..29772d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -20,8 +20,6 @@ package org.apache.carbondata.core.datastore.page.encoding;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaFloatingCodec;
@@ -73,39 +71,36 @@ public class DefaultEncodingFactory extends EncodingFactory {
 
   private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec,
       ColumnPage inputPage) {
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
     switch (columnSpec.getColumnType()) {
       case GLOBAL_DICTIONARY:
       case DIRECT_DICTIONARY:
       case PLAIN_VALUE:
         return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null);
       case COMPLEX:
-        return new ComplexDimensionIndexCodec(false, false, compressor).createEncoder(null);
+        return new ComplexDimensionIndexCodec(false, false).createEncoder(null);
       default:
-        throw new RuntimeException("unsupported dimension type: " +
-            columnSpec.getColumnType());
+        throw new RuntimeException("unsupported dimension type: " + columnSpec.getColumnType());
     }
   }
 
   private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec dimensionSpec) {
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
     switch (dimensionSpec.getColumnType()) {
       case GLOBAL_DICTIONARY:
         return new DictDimensionIndexCodec(
             dimensionSpec.isInSortColumns(),
-            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            compressor).createEncoder(null);
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex())
+            .createEncoder(null);
       case DIRECT_DICTIONARY:
         return new DirectDictDimensionIndexCodec(
             dimensionSpec.isInSortColumns(),
-            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            compressor).createEncoder(null);
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex())
+            .createEncoder(null);
       case PLAIN_VALUE:
         return new HighCardDictDimensionIndexCodec(
             dimensionSpec.isInSortColumns(),
             dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
-            dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR,
-            compressor).createEncoder(null);
+            dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR)
+            .createEncoder(null);
       default:
         throw new RuntimeException("unsupported dimension type: " +
             dimensionSpec.getColumnType());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 8bc67c0..d119c8f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -64,8 +64,8 @@ public abstract class EncodingFactory {
   /**
    * Return new decoder based on encoder metadata read from file
    */
-  public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas)
-      throws IOException {
+  public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
+      String compressor) throws IOException {
     assert (encodings.size() == 1);
     assert (encoderMetas.size() == 1);
     Encoding encoding = encodings.get(0);
@@ -111,21 +111,20 @@ public abstract class EncodingFactory {
     } else {
       // for backward compatibility
       ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
-      return createDecoderLegacy(metadata);
+      return createDecoderLegacy(metadata, compressor);
     }
   }
 
   /**
    * Old way of creating decoder, based on algorithm
    */
-  public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
+  public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor) {
     if (null == metadata) {
       throw new RuntimeException("internal error");
     }
     SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
     TableSpec.ColumnSpec spec =
         TableSpec.ColumnSpec.newInstanceLegacy("legacy", stats.getDataType(), ColumnType.MEASURE);
-    String compressor = "snappy";
     DataType dataType = DataType.getDataType(metadata.getType());
     if (dataType == DataTypes.BYTE ||
         dataType == DataTypes.SHORT ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 0e8d1c0..bb928c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -67,16 +67,19 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
 
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
-    final Compressor compressor = CompressorFactory.getInstance().getCompressor();
     return new ColumnPageEncoder() {
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+        encodedPage = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
             input.getPageSize());
         input.convertValue(converter);
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
         return result;
@@ -92,7 +95,7 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
         return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
-            compressor.getName());
+            inputPage.getColumnCompressorName());
       }
 
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index f20422c..ac9693d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -78,16 +78,19 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
     return new ColumnPageEncoder() {
-      final Compressor compressor = CompressorFactory.getInstance().getCompressor();
 
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+        encodedPage = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
             input.getPageSize());
         input.convertValue(converter);
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
         return result;
@@ -96,7 +99,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
         return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType,
-            inputPage.getStatistics(), compressor.getName());
+            inputPage.getStatistics(), inputPage.getColumnCompressorName());
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 6d7697b..028fa71 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -59,15 +59,18 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
 
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
-    final Compressor compressor = CompressorFactory.getInstance().getCompressor();
     return new ColumnPageEncoder() {
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+        encodedPage = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
             input.getPageSize());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         input.convertValue(converter);
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
@@ -84,7 +87,7 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
         return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
-            compressor.getName());
+            inputPage.getColumnCompressorName());
       }
 
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index cfc26c7..a9cf742 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -56,15 +56,18 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
 
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
-    final Compressor compressor = CompressorFactory.getInstance().getCompressor();
     return new ColumnPageEncoder() {
       @Override
       protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
         if (encodedPage != null) {
           throw new IllegalStateException("already encoded");
         }
-        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+        encodedPage = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(),
+                targetDataType, input.getColumnPageEncoderMeta().getCompressorName()),
             input.getPageSize());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         input.convertValue(converter);
         byte[] result = encodedPage.compress(compressor);
         encodedPage.freeMemory();
@@ -81,7 +84,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       @Override
       protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
         return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
-            compressor.getName());
+            inputPage.getColumnCompressorName());
       }
 
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 7e1e9dd..aa03ec1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -55,69 +54,53 @@ public class DirectCompressCodec implements ColumnPageCodec {
 
   @Override
   public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
-    // TODO: make compressor configurable in create table
-    return new DirectCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR);
-  }
-
-  @Override
-  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
-    return new DirectDecompressor(meta);
-  }
-
-  private class DirectCompressor extends ColumnPageEncoder {
-
-    private Compressor compressor;
-
-    DirectCompressor(String compressorName) {
-      this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
-    }
-
-    @Override
-    protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
-      return input.compress(compressor);
-    }
+    return new ColumnPageEncoder() {
 
-    @Override
-    protected List<Encoding> getEncodingList() {
-      List<Encoding> encodings = new ArrayList<>();
-      encodings.add(dataType == DataTypes.VARCHAR ?
-          Encoding.DIRECT_COMPRESS_VARCHAR :
-          Encoding.DIRECT_COMPRESS);
-      return encodings;
-    }
+      @Override
+      protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
+        return input.compress(compressor);
+      }
 
-    @Override
-    protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
-      return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(),
-          inputPage.getStatistics(), compressor.getName());
-    }
+      @Override
+      protected List<Encoding> getEncodingList() {
+        List<Encoding> encodings = new ArrayList<>();
+        encodings.add(dataType == DataTypes.VARCHAR ?
+            Encoding.DIRECT_COMPRESS_VARCHAR :
+            Encoding.DIRECT_COMPRESS);
+        return encodings;
+      }
 
+      @Override
+      protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+        return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(),
+            inputPage.getStatistics(), inputPage.getColumnCompressorName());
+      }
+    };
   }
 
-  private class DirectDecompressor implements ColumnPageDecoder {
-
-    private ColumnPageEncoderMeta meta;
-
-    DirectDecompressor(ColumnPageEncoderMeta meta) {
-      this.meta = meta;
-    }
-
-    @Override
-    public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
-      ColumnPage decodedPage;
-      if (DataTypes.isDecimal(dataType)) {
-        decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
-      } else {
-        decodedPage = ColumnPage.decompress(meta, input, offset, length, false);
+  @Override
+  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+    return new ColumnPageDecoder() {
+
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+        ColumnPage decodedPage;
+        if (DataTypes.isDecimal(dataType)) {
+          decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+        } else {
+          decodedPage = ColumnPage.decompress(meta, input, offset, length, false);
+        }
+        return LazyColumnPage.newPage(decodedPage, converter);
       }
-      return LazyColumnPage.newPage(decodedPage, converter);
-    }
 
-    @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+      @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
         throws MemoryException, IOException {
-      return LazyColumnPage
-          .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
-    }
+        return LazyColumnPage.newPage(
+            ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
+      }
+    };
   }
 
   private ColumnPageValueConverter converter = new ColumnPageValueConverter() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
index e37b8f6..cc044cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -31,9 +32,8 @@ import org.apache.carbondata.format.Encoding;
 
 public class ComplexDimensionIndexCodec extends IndexStorageCodec {
 
-  public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
-      Compressor compressor) {
-    super(isSort, isInvertedIndex, compressor);
+  public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
+    super(isSort, isInvertedIndex);
   }
 
   @Override
@@ -49,6 +49,8 @@ public class ComplexDimensionIndexCodec extends IndexStorageCodec {
         IndexStorage indexStorage =
             new BlockIndexerStorageForShort(inputPage.getByteArrayPage(), false, false, false);
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            inputPage.getColumnCompressorName());
         byte[] compressed = compressor.compressByte(flattened);
         super.indexStorage = indexStorage;
         super.compressedDataPage = compressed;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
index d157654..66f5f1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -32,8 +33,8 @@ import org.apache.carbondata.format.Encoding;
 
 public class DictDimensionIndexCodec extends IndexStorageCodec {
 
-  public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
-    super(isSort, isInvertedIndex, compressor);
+  public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
+    super(isSort, isInvertedIndex);
   }
 
   @Override
@@ -54,6 +55,8 @@ public class DictDimensionIndexCodec extends IndexStorageCodec {
           indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            inputPage.getColumnCompressorName());
         super.compressedDataPage = compressor.compressByte(flattened);
         super.indexStorage = indexStorage;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
index 1e5015b..a130cbd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -32,9 +33,8 @@ import org.apache.carbondata.format.Encoding;
 
 public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
 
-  public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
-      Compressor compressor) {
-    super(isSort, isInvertedIndex, compressor);
+  public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) {
+    super(isSort, isInvertedIndex);
   }
 
   @Override
@@ -55,6 +55,8 @@ public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
           indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            inputPage.getColumnCompressorName());
         super.compressedDataPage = compressor.compressByte(flattened);
         super.indexStorage = indexStorage;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
index f9c124f..bce8523 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -37,8 +38,8 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
   private boolean isVarcharType;
 
   public HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
-      boolean isVarcharType, Compressor compressor) {
-    super(isSort, isInvertedIndex, compressor);
+      boolean isVarcharType) {
+    super(isSort, isInvertedIndex);
     this.isVarcharType = isVarcharType;
   }
 
@@ -63,6 +64,8 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
               new BlockIndexerStorageForNoInvertedIndexForShort(data, isDictionary);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        Compressor compressor = CompressorFactory.getInstance().getCompressor(
+            input.getColumnCompressorName());
         super.compressedDataPage = compressor.compressByte(flattened);
         super.indexStorage = indexStorage;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
index cb6b387..13a9215 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
@@ -17,20 +17,17 @@
 
 package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
 
-import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 
 public abstract class IndexStorageCodec implements ColumnPageCodec {
-  protected Compressor compressor;
   protected boolean isSort;
   protected boolean isInvertedIndex;
 
-  IndexStorageCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+  IndexStorageCodec(boolean isSort, boolean isInvertedIndex) {
     this.isSort = isSort;
     this.isInvertedIndex = isInvertedIndex;
-    this.compressor = compressor;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index fa03809..e7d4118 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -66,7 +66,7 @@ public class RLECodec implements ColumnPageCodec {
   public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
     assert meta instanceof RLEEncoderMeta;
     RLEEncoderMeta codecMeta = (RLEEncoderMeta) meta;
-    return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize());
+    return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize(), meta.getCompressorName());
   }
 
   // This codec supports integral type only
@@ -151,7 +151,10 @@ public class RLECodec implements ColumnPageCodec {
     @Override
     protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
       return new RLEEncoderMeta(inputPage.getColumnSpec(),
-          inputPage.getDataType(), inputPage.getPageSize(), inputPage.getStatistics());
+          inputPage.getDataType(),
+          inputPage.getPageSize(),
+          inputPage.getStatistics(),
+          inputPage.getColumnCompressorName());
     }
 
     private void putValue(Object value) throws IOException {
@@ -281,11 +284,13 @@ public class RLECodec implements ColumnPageCodec {
 
     private TableSpec.ColumnSpec columnSpec;
     private int pageSize;
+    private String compressorName;
 
-    private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize) {
+    private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize, String compressorName) {
       validateDataType(columnSpec.getSchemaDataType());
       this.columnSpec = columnSpec;
       this.pageSize = pageSize;
+      this.compressorName = compressorName;
     }
 
     @Override
@@ -293,7 +298,8 @@ public class RLECodec implements ColumnPageCodec {
         throws MemoryException, IOException {
       DataType dataType = columnSpec.getSchemaDataType();
       DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
-      ColumnPage resultPage = ColumnPage.newPage(columnSpec, dataType, pageSize);
+      ColumnPage resultPage = ColumnPage.newPage(
+          new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize);
       if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
         decodeBytePage(in, resultPage);
       } else if (dataType == DataTypes.SHORT) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
index 8871671..25533f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
@@ -39,8 +39,8 @@ public class RLEEncoderMeta extends ColumnPageEncoderMeta implements Writable {
   }
 
   public RLEEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
-      SimpleStatsResult stats) {
-    super(columnSpec, dataType, stats, "");
+      SimpleStatsResult stats, String compressorName) {
+    super(columnSpec, dataType, stats, compressorName);
     this.pageSize = pageSize;
   }