You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/11/09 09:03:38 UTC

carbondata git commit: [CARBONDATA-2930] Support customize column compressor

Repository: carbondata
Updated Branches:
  refs/heads/master 5344b781e -> 6707db689


[CARBONDATA-2930] Support customize column compressor

Support customize column compressor so that user can add their own
implementation of compressor. For customize compressor, user can
directly use its full class name while creating table or setting it to
system env.

This closes #2715


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6707db68
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6707db68
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6707db68

Branch: refs/heads/master
Commit: 6707db6893478bef28e2e58c2e6bbfbfc2a46b27
Parents: 5344b78
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Wed Sep 12 20:30:17 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Nov 9 17:03:19 2018 +0800

----------------------------------------------------------------------
 ...mpressedDimensionChunkFileBasedReaderV1.java |   2 +-
 ...mpressedDimensionChunkFileBasedReaderV2.java |   2 +-
 ...CompressedMeasureChunkFileBasedReaderV1.java |   2 +-
 ...CompressedMeasureChunkFileBasedReaderV2.java |   4 +-
 .../compression/CompressorFactory.java          |  87 +++++++--
 .../core/util/BlockletDataMapUtil.java          |   5 +-
 .../core/util/CarbonMetadataUtil.java           |   2 +-
 .../dataload/TestLoadDataWithCompression.scala  | 180 +++++++++++++++++--
 .../stream/CarbonStreamRecordReader.java        |   2 +-
 .../streaming/CarbonStreamRecordWriter.java     |   2 +-
 10 files changed, 249 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
index b8c96f9..225b867 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
@@ -57,7 +57,7 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead
     super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
     this.dimensionColumnChunk = blockletInfo.getDimensionColumnChunk();
     // for v1 store, the compressor is snappy
-    this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
+    this.compressor = CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index 025bac9..4e5285b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -49,7 +49,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
       final int[] eachColumnValueSize, final String filePath) {
     super(blockletInfo, eachColumnValueSize, filePath);
     // for v2 store, the compressor is snappy
-    this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
+    this.compressor = CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index e1bcdc0..de9ffec 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -98,7 +98,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
     DataChunk dataChunk = measureColumnChunks.get(blockIndex);
     ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
     ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
-        CompressorFactory.SupportedCompressor.SNAPPY.getName());
+        CompressorFactory.NativeSupportedCompressor.SNAPPY.getName());
     ColumnPage decodedPage = codec.decode(measureRawColumnChunk.getRawData().array(),
         (int) measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
     decodedPage.setNullBits(dataChunk.getNullValueIndexForColumn());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 86083cd..80813e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -47,7 +47,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
   public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
       final String filePath) {
     super(blockletInfo, filePath);
-    this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor();
+    this.compressor = CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor();
   }
 
   @Override
@@ -140,7 +140,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
 
     ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV2(encodedMeta);
     ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta,
-        CompressorFactory.SupportedCompressor.SNAPPY.getName());
+        CompressorFactory.NativeSupportedCompressor.SNAPPY.getName());
     byte[] rawData = measureRawColumnChunk.getRawData().array();
     return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
index 40459b1..f7d4e06 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
@@ -17,19 +17,24 @@
 
 package org.apache.carbondata.core.datastore.compression;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+
 public class CompressorFactory {
+  private static final Logger LOGGER = LogServiceFactory.getLogService(
+      CompressorFactory.class.getName());
   private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory();
 
-  private final Map<String, SupportedCompressor> compressors = new HashMap<>();
+  private final Map<String, Compressor> allSupportedCompressors = new HashMap<>();
 
-  public enum SupportedCompressor {
+  public enum NativeSupportedCompressor {
     SNAPPY("snappy", SnappyCompressor.class),
     ZSTD("zstd", ZstdCompressor.class);
 
@@ -37,7 +42,7 @@ public class CompressorFactory {
     private Class<Compressor> compressorClass;
     private transient Compressor compressor;
 
-    SupportedCompressor(String name, Class compressorCls) {
+    NativeSupportedCompressor(String name, Class compressorCls) {
       this.name = name;
       this.compressorClass = compressorCls;
     }
@@ -54,7 +59,7 @@ public class CompressorFactory {
         try {
           this.compressor = compressorClass.newInstance();
         } catch (InstantiationException | IllegalAccessException e) {
-          throw new RuntimeException("Exception occurs while getting compressor for " + name);
+          throw new RuntimeException("Exception occurs while getting compressor for " + name, e);
         }
       }
       return this.compressor;
@@ -62,8 +67,9 @@ public class CompressorFactory {
   }
 
   private CompressorFactory() {
-    for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) {
-      compressors.put(supportedCompressor.getName(), supportedCompressor);
+    for (NativeSupportedCompressor nativeSupportedCompressor : NativeSupportedCompressor.values()) {
+      allSupportedCompressors.put(nativeSupportedCompressor.getName(),
+          nativeSupportedCompressor.getCompressor());
     }
   }
 
@@ -72,6 +78,46 @@ public class CompressorFactory {
   }
 
   /**
+   * register the compressor using reflection.
+   * If the class name of the compressor has already been registered before, it will return false;
+   * If the reflection fails to work or the compressor name has problem, it will throw
+   * RunTimeException; If it is registered successfully, it will return true.
+   *
+   * @param compressorClassName full class name of the compressor
+   * @return true if register successfully, false if failed.
+   */
+  private Compressor registerColumnCompressor(String compressorClassName) {
+    if (allSupportedCompressors.containsKey(compressorClassName)) {
+      return allSupportedCompressors.get(compressorClassName);
+    }
+
+    Class clazz;
+    try {
+      clazz = Class.forName(compressorClassName);
+      Object instance = clazz.newInstance();
+      if (instance instanceof Compressor) {
+        if (!((Compressor) instance).getName().equals(compressorClassName)) {
+          throw new RuntimeException(String.format("For not carbondata native supported compressor,"
+              + " the result of method getName() should be the full class name. Expected '%s',"
+              + " found '%s'", compressorClassName, ((Compressor) instance).getName()));
+        }
+        allSupportedCompressors.put(compressorClassName, (Compressor) instance);
+        LOGGER.info(
+            String.format("sucessfully register compressor %s to carbondata", compressorClassName));
+        return (Compressor) instance;
+      } else {
+        throw new RuntimeException(
+            String.format("Compressor '%s' should be a subclass of '%s'",
+                compressorClassName, Compressor.class.getCanonicalName()));
+      }
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      LOGGER.error(String.format("Failed to register compressor '%s'", compressorClassName), e);
+      throw new RuntimeException(
+          String.format("Failed to load compressor '%s', currently carbondata supports %s",
+              compressorClassName, StringUtils.join(allSupportedCompressors.keySet(), ", ")), e);
+    }
+  }
+  /**
    * get the default compressor.
    * This method can only be called in data load procedure to compress column page.
    * In query procedure, we should read the compressor information from the metadata
@@ -80,20 +126,27 @@ public class CompressorFactory {
   public Compressor getCompressor() {
     String compressorType = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR);
-    if (!compressors.containsKey(compressorType)) {
-      throw new UnsupportedOperationException(
-          "Invalid compressor type provided! Currently we only support "
-              + Arrays.toString(SupportedCompressor.values()));
-    }
     return getCompressor(compressorType);
   }
 
   public Compressor getCompressor(String name) {
-    if (compressors.containsKey(name.toLowerCase())) {
-      return compressors.get(name.toLowerCase()).getCompressor();
+    String internalCompressorName = getInternalCompressorName(name);
+    if (null == internalCompressorName) {
+      // maybe this is a new compressor, we will try to register it
+      return registerColumnCompressor(name);
+    } else {
+      return allSupportedCompressors.get(internalCompressorName);
+    }
+  }
+
+  // if we specify the compressor name in table property, carbondata now will convert the
+  // property value to lowercase, so here we will ingore the case and find the real name.
+  private String getInternalCompressorName(String name) {
+    for (String key : allSupportedCompressors.keySet()) {
+      if (key.equalsIgnoreCase(name)) {
+        return key;
+      }
     }
-    throw new UnsupportedOperationException(
-        name + " compressor is not supported, currently we only support "
-            + Arrays.toString(SupportedCompressor.values()));
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 5f78135..fbdeff8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -359,7 +359,8 @@ public class BlockletDataMapUtil {
     }
     byte[] byteArray = stream.toByteArray();
     // Compress to reduce the size of schema
-    return CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().compressByte(byteArray);
+    return CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor().compressByte(
+        byteArray);
   }
 
   /**
@@ -370,7 +371,7 @@ public class BlockletDataMapUtil {
    */
   public static List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException {
     // uncompress it.
-    schemaArray = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().unCompressByte(
+    schemaArray = CompressorFactory.NativeSupportedCompressor.SNAPPY.getCompressor().unCompressByte(
         schemaArray);
     ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
     DataInput schemaInput = new DataInputStream(schemaStream);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 9c82fa4..0fe33b0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -337,7 +337,7 @@ public class CarbonMetadataUtil {
       return chunkCompressionMeta.getCompressor_name();
     } else {
       // this is for legacy store before 1.5.0
-      return CompressorFactory.SupportedCompressor.SNAPPY.getName();
+      return CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
     }
   }
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
index 21fbfc1..f5153fc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.integration.spark.testsuite.dataload
 
 import java.io.File
+import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util.concurrent.{ExecutorService, Executors, Future}
 import java.util.Calendar
@@ -31,9 +32,9 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.Compressor
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.exception.InvalidConfigurationException
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
@@ -42,6 +43,112 @@ case class Rcd(booleanField: Boolean, shortField: Short, intField: Int, bigintFi
     dateField: String, charField: String, floatField: Float, stringDictField: String,
     stringSortField: String, stringLocalDictField: String, longStringField: String)
 
+/**
+ * This compressor actually will not compress or decompress anything.
+ * It is used for test case of specifying customized compressor.
+ */
+class CustomizeCompressor extends Compressor {
+  override def getName: String = "org.apache.carbondata.integration.spark.testsuite.dataload.CustomizeCompressor"
+
+  override def compressByte(unCompInput: Array[Byte]): Array[Byte] = unCompInput
+
+  override def compressByte(unCompInput: Array[Byte], byteSize: Int): Array[Byte] = unCompInput
+
+  override def unCompressByte(compInput: Array[Byte]): Array[Byte] = compInput
+
+  override def unCompressByte(compInput: Array[Byte], offset: Int, length: Int): Array[Byte] = compInput
+
+  override def compressShort(unCompInput: Array[Short]): Array[Byte] = {
+    val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_SHORT)
+    buffer.asShortBuffer().put(unCompInput)
+    compressByte(buffer.array())
+  }
+
+  override def unCompressShort(compInput: Array[Byte], offset: Int, length: Int): Array[Short] = {
+    val buffer = ByteBuffer.wrap(compInput).asShortBuffer()
+    val res = new Array[Short](compInput.length / ByteUtil.SIZEOF_SHORT)
+    buffer.get(res)
+    res
+  }
+
+  override def compressInt(unCompInput: Array[Int]): Array[Byte] = {
+    val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_INT)
+    buffer.asIntBuffer().put(unCompInput)
+    compressByte(buffer.array())
+  }
+
+  override def unCompressInt(compInput: Array[Byte], offset: Int, length: Int): Array[Int] = {
+    val buffer = ByteBuffer.wrap(compInput).asIntBuffer()
+    val res = new Array[Int](compInput.length / ByteUtil.SIZEOF_INT)
+    buffer.get(res)
+    res
+  }
+
+  override def compressLong(unCompInput: Array[Long]): Array[Byte] = {
+    val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_LONG)
+    buffer.asLongBuffer().put(unCompInput)
+    compressByte(buffer.array())
+  }
+
+  override def unCompressLong(compInput: Array[Byte], offset: Int, length: Int): Array[Long] = {
+    val buffer = ByteBuffer.wrap(compInput).asLongBuffer()
+    val res = new Array[Long](compInput.length / ByteUtil.SIZEOF_LONG)
+    buffer.get(res)
+    res
+  }
+
+  override def compressFloat(unCompInput: Array[Float]): Array[Byte] = {
+    val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_FLOAT)
+    buffer.asFloatBuffer().put(unCompInput)
+    compressByte(buffer.array())
+  }
+
+  override def unCompressFloat(compInput: Array[Byte], offset: Int, length: Int): Array[Float] = {
+    val buffer = ByteBuffer.wrap(compInput).asFloatBuffer()
+    val res = new Array[Float](compInput.length / ByteUtil.SIZEOF_FLOAT)
+    buffer.get(res)
+    res
+  }
+
+  override def compressDouble(unCompInput: Array[Double]): Array[Byte] = {
+    val buffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_DOUBLE)
+    buffer.asDoubleBuffer().put(unCompInput)
+    compressByte(buffer.array())
+  }
+
+  override def unCompressDouble(compInput: Array[Byte], offset: Int, length: Int): Array[Double] = {
+    val buffer = ByteBuffer.wrap(compInput).asDoubleBuffer()
+    val res = new Array[Double](compInput.length / ByteUtil.SIZEOF_DOUBLE)
+    buffer.get(res)
+    res
+  }
+
+  override def rawCompress(inputAddress: Long, inputSize: Int, outputAddress: Long): Long = {
+    throw new RuntimeException("Not implemented rawCompress for customized compressor yet")
+  }
+
+  override def rawUncompress(input: Array[Byte], output: Array[Byte]): Long = {
+    System.arraycopy(input, 0, output, 0, input.length)
+    input.length
+  }
+
+  override def maxCompressedLength(inputSize: Long): Long = {
+    inputSize
+  }
+
+  /**
+   * Whether this compressor support zero-copy during compression.
+   * Zero-copy means that the compressor support receiving memory address (pointer)
+   * and returning result in memory address (pointer).
+   * Currently not all java version of the compressors support this feature.
+   *
+   * @return true if it supports, otherwise return false
+   */
+  override def supportUnsafe(): Boolean = {
+    false
+  }
+}
+
 class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   private val tableName = "load_test_with_compressor"
   private var executorService: ExecutorService = _
@@ -161,7 +268,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
     loadData()
 
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "ZSTD")
     loadData()
     checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
   }
@@ -206,14 +313,13 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
     assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 1)
   }
 
-  test("test data loading with unsupported compressor and onheap") {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+  test("test data loading with unsupported compressor") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "fake")
     createTable()
-    val exception = intercept[UnsupportedOperationException] {
+    val exception = intercept[RuntimeException] {
       loadData()
     }
-    assert(exception.getMessage.contains("Invalid compressor type provided"))
+    assert(exception.getMessage.contains("Failed to load compressor 'fake'"))
   }
 
   test("test compaction with unsupported compressor") {
@@ -222,10 +328,10 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
     loadData()
 
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "fake")
-    val exception = intercept[UnsupportedOperationException] {
+    val exception = intercept[RuntimeException] {
       sql(s"ALTER TABLE $tableName COMPACT 'major'")
     }
-    assert(exception.getMessage.contains("Invalid compressor type provided"))
+    assert(exception.getMessage.contains("Failed to load compressor 'fake'"))
   }
 
   private def generateAllDataTypeDF(lineNum: Int) = {
@@ -309,7 +415,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
     checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
     val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
     val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
-    assert("zstd".equalsIgnoreCase(tableColumnCompressor))
+    assertResult("zstd")(tableColumnCompressor)
   }
 
   test("test creating table with unsupported compressor") {
@@ -317,10 +423,60 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
     // the system configuration for compressor is snappy
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
     // create table with unsupported compressor
-    val exception = intercept[InvalidConfigurationException] {
+    val exception = intercept[RuntimeException] {
       createTable (columnCompressor = "fakecompressor")
     }
-    assert(exception.getMessage.contains("fakecompressor compressor is not supported"))
+    assert(exception.getMessage.contains("Failed to load compressor 'fakecompressor'"))
+  }
+
+  test("test load data with customize compressor") {
+    createTable()
+    // fist usage of this compressor will register it
+    var compressorName = "org.apache.carbondata.integration.spark.testsuite.dataload.CustomizeCompressor"
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, compressorName)
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+
+    // reuse the registerd compressor
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, compressorName)
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8 * 2)))
+
+    // cannot register compressor whose class name is not the result of method 'getName'
+    compressorName = "org.apache.carbondata.core.datastore.compression.ZstdCompressor"
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, compressorName)
+    var exception = intercept[RuntimeException] {
+      loadData()
+    }
+    assertResult("For not carbondata native supported compressor, the result of method getName() should be the full class name. Expected 'org.apache.carbondata.core.datastore.compression.ZstdCompressor', found 'zstd'")(exception.getMessage)
+
+    // cannot register compressor with reflection error
+    compressorName = "some.unknow.fakecompressor"
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, compressorName)
+    exception = intercept[RuntimeException] {
+      loadData()
+    }
+    assert(exception.getMessage.contains("Failed to load compressor 'some.unknow.fakecompressor'"))
+  }
+
+  test("test create table with customize compressor") {
+    val compressorName = "org.apache.carbondata.integration.spark.testsuite.dataload.CustomizeCompressor"
+    // first usage of this customize compressor will register it
+    createTable(columnCompressor = compressorName)
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+    val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
+    val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
+    assertResult(compressorName.toLowerCase())(tableColumnCompressor)
+
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    // reuse the customize compressor again
+    createTable(columnCompressor = compressorName)
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+    val carbonTable2 = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
+    val tableColumnCompressor2 = carbonTable2.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
+    assertResult(compressorName.toLowerCase())(tableColumnCompressor2)
   }
 
   private def generateAllDataTypeFiles(lineNum: Int, csvDir: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index 2ca023f..b66e446 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -266,7 +266,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     if (header.isSetCompressor_name()) {
       compressorName = header.getCompressor_name();
     } else {
-      compressorName = CompressorFactory.SupportedCompressor.SNAPPY.getName();
+      compressorName = CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
     }
     return header.getSync_marker();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6707db68/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 1ec0030..2a95935 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -176,7 +176,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
       if (header.isSetCompressor_name()) {
         compressorName = header.getCompressor_name();
       } else {
-        compressorName = CompressorFactory.SupportedCompressor.SNAPPY.getName();
+        compressorName = CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
       }
     } else {
       // IF the file is not existed, use the create api