You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/12/17 14:16:50 UTC

[08/21] carbondata git commit: [CARBONDATA-3005]Support Gzip as column compressor

[CARBONDATA-3005]Support Gzip as column compressor

This PR is to add a new compressor "Gzip" and enhance the compressing capabilities offered by CarbonData.
User can now use gzip as the compressor for loading the data.
Gzip can be set at System Properties level or also for particular table.

This closes #2847


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8f57294d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8f57294d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8f57294d

Branch: refs/heads/branch-1.5
Commit: 8f57294d14532457d6fd3877a182f66055d338c2
Parents: 22bec10
Author: shardul-cr7 <sh...@gmail.com>
Authored: Tue Oct 23 17:27:47 2018 +0530
Committer: Raghunandan S <ca...@gmail.com>
Committed: Mon Dec 17 18:58:33 2018 +0530

----------------------------------------------------------------------
 .../compression/AbstractCompressor.java         |   1 +
 .../compression/CompressorFactory.java          |   3 +-
 .../datastore/compression/GzipCompressor.java   | 134 +++++++++++++++++++
 .../datastore/compression/ZstdCompressor.java   |   5 -
 .../dataload/TestLoadDataWithCompression.scala  |  94 ++++++++++---
 .../TestLoadWithSortTempCompressed.scala        |  20 +++
 6 files changed, 236 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f57294d/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
index 0724bdc..c554dc6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
@@ -123,4 +123,5 @@ public abstract class AbstractCompressor implements Compressor {
     return false;
   }
 
+  @Override public boolean supportUnsafe() { return false; }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f57294d/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
index f7d4e06..b7779ba 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java
@@ -36,7 +36,8 @@ public class CompressorFactory {
 
   public enum NativeSupportedCompressor {
     SNAPPY("snappy", SnappyCompressor.class),
-    ZSTD("zstd", ZstdCompressor.class);
+    ZSTD("zstd", ZstdCompressor.class),
+    GZIP("gzip", GzipCompressor.class);
 
     private String name;
     private Class<Compressor> compressorClass;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f57294d/core/src/main/java/org/apache/carbondata/core/datastore/compression/GzipCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/GzipCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/GzipCompressor.java
new file mode 100644
index 0000000..b386913
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/GzipCompressor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+
+/**
+ * Codec Class for performing Gzip Compression
+ */
+public class GzipCompressor extends AbstractCompressor {
+
+  @Override public String getName() {
+    return "gzip";
+  }
+
+  /**
+   * This method takes the Byte Array data and Compresses in gzip format
+   *
+   * @param data Data Byte Array passed for compression
+   * @return Compressed Byte Array
+   */
+  private byte[] compressData(byte[] data) {
+    int initialSize = (data.length / 2) == 0 ? data.length : data.length / 2;
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(initialSize);
+    try {
+      GzipCompressorOutputStream gzipCompressorOutputStream =
+          new GzipCompressorOutputStream(byteArrayOutputStream);
+      try {
+        /**
+         * Below api will write bytes from specified byte array to the gzipCompressorOutputStream
+         * The output stream will compress the given byte array.
+         */
+        gzipCompressorOutputStream.write(data);
+      } catch (IOException e) {
+        throw new RuntimeException("Error during Compression writing step ", e);
+      } finally {
+        gzipCompressorOutputStream.close();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Error during Compression step ", e);
+    }
+    return byteArrayOutputStream.toByteArray();
+  }
+
+  /**
+   * This method takes the Byte Array data and Decompresses in gzip format
+   *
+   * @param data   Data Byte Array for Compression
+   * @param offset Start value of Data Byte Array
+   * @param length Size of Byte Array
+   * @return
+   */
+  private byte[] decompressData(byte[] data, int offset, int length) {
+    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data, offset, length);
+    ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+    try {
+      GzipCompressorInputStream gzipCompressorInputStream =
+          new GzipCompressorInputStream(byteArrayInputStream);
+      int initialSize = (data.length * 2) < Integer.MAX_VALUE ? (data.length * 2) : data.length;
+      byte[] buffer = new byte[initialSize];
+      int len;
+      /**
+       * Reads the next byte of the data from the input stream and stores them into buffer
+       * Data is then read from the buffer and put into byteOutputStream from a offset.
+       */
+      while ((len = gzipCompressorInputStream.read(buffer)) != -1) {
+        byteOutputStream.write(buffer, 0, len);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Error during Decompression step ", e);
+    }
+    return byteOutputStream.toByteArray();
+  }
+
+  @Override public byte[] compressByte(byte[] unCompInput) {
+    return compressData(unCompInput);
+  }
+
+  @Override public byte[] compressByte(byte[] unCompInput, int byteSize) {
+    return compressData(unCompInput);
+  }
+
+  @Override public byte[] unCompressByte(byte[] compInput) {
+    return decompressData(compInput, 0, compInput.length);
+  }
+
+  @Override public byte[] unCompressByte(byte[] compInput, int offset, int length) {
+    return decompressData(compInput, offset, length);
+  }
+
+  @Override public long rawUncompress(byte[] input, byte[] output) {
+    //gzip api doesnt have rawUncompress yet.
+    throw new RuntimeException("Not implemented rawUcompress for gzip yet");
+  }
+
+  @Override public long maxCompressedLength(long inputSize) {
+    // Check if input size is lower than the max possible size
+    if (inputSize < Integer.MAX_VALUE) {
+      return inputSize;
+    } else {
+      throw new RuntimeException("compress input oversize for gzip");
+    }
+  }
+
+  @Override public int unCompressedLength(byte[] data, int offset, int length) {
+    //gzip api doesnt have UncompressedLength
+    throw new RuntimeException("Unsupported operation Exception");
+  }
+
+  @Override public int rawUncompress(byte[] data, int offset, int length, byte[] output) {
+    //gzip api doesnt have rawUncompress yet.
+    throw new RuntimeException("Not implemented rawUcompress for gzip yet");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f57294d/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
index 8523c46..56faa20 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java
@@ -70,11 +70,6 @@ public class ZstdCompressor extends AbstractCompressor {
    * currently java version of zstd does not support this feature.
    * It may support it in upcoming release 1.3.5-3, then we can optimize this accordingly.
    */
-  @Override
-  public boolean supportUnsafe() {
-    return false;
-  }
-
   @Override public int unCompressedLength(byte[] data, int offset, int length) {
     throw new RuntimeException("Unsupported operation Exception");
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f57294d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
index 6ec44e7..b12e269 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
@@ -168,6 +168,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
   private val tableName = "load_test_with_compressor"
   private var executorService: ExecutorService = _
   private val csvDataDir = s"$integrationPath/spark2/target/csv_load_compression"
+  private val compressors = Array("snappy","zstd","gzip")
 
   override protected def beforeAll(): Unit = {
     executorService = Executors.newFixedThreadPool(3)
@@ -252,50 +253,94 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
        """.stripMargin)
   }
 
-  test("test data loading with snappy compressor and offheap") {
+  test("test data loading with different compressors and offheap") {
+    for(comp <- compressors){
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, comp)
+      createTable()
+      loadData()
+      checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+    }
+  }
+
+  test("test data loading with different compressors and onheap") {
+    for(comp <- compressors){
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, comp)
+      createTable()
+      loadData()
+      checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+    }
+  }
+
+  test("test current zstd compressor on legacy store with snappy") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
     createTable()
     loadData()
-    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
-  }
 
-  test("test data loading with zstd compressor and offheap") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
+  }
+
+  test("test current gzip compressor on legacy store with snappy") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
     createTable()
     loadData()
-    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
   }
 
-  test("test data loading with zstd compressor and onheap") {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+  test("test current snappy compressor on legacy store with zstd") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
     createTable()
     loadData()
-    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
-  }
 
-  test("test current zstd compressor on legacy store with snappy") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
+  }
+
+  test("test current snappy compressor on legacy store with gzip") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
     createTable()
     loadData()
 
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "ZSTD")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
     loadData()
     checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
   }
 
-  test("test current snappy compressor on legacy store with zstd") {
+  test("test current gzip compressor on legacy store with zstd") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
     createTable()
     loadData()
 
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
+  }
+
+  test("test current zstd compressor on legacy store with gzip") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
+    createTable()
+    loadData()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
     loadData()
     checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
   }
@@ -311,7 +356,7 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
     loadData()
 
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
     loadData()
 
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
@@ -416,11 +461,17 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
       CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "zstd")
     }
 
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "gzip")
+    future = compactAsync()
+    while (!future.isDone) {
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "gzip")
+    }
+
     checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 2)))
     checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1")))
   }
 
-  test("test creating table with specified compressor") {
+  test("test creating table with specified zstd compressor") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
     // the system configuration for compressor is snappy
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
@@ -433,6 +484,19 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
     assertResult("zstd")(tableColumnCompressor)
   }
 
+  test("test creating table with specified gzip compressor") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    // the system configuration for compressor is snappy
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy")
+    // create table with gzip as compressor
+    createTable(columnCompressor = "gzip")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+    val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
+    val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
+    assert("gzip".equalsIgnoreCase(tableColumnCompressor))
+  }
+
   test("test creating table with unsupported compressor") {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
     // the system configuration for compressor is snappy

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f57294d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
index 21affee..5d9f05c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
@@ -137,6 +137,26 @@ class TestLoadWithSortTempCompressed extends QueryTest
       originOffHeapStatus)
   }
 
+  test("test data load for simple table with sort temp compressed with gzip" +
+       " and off-heap sort enabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "gzip")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    testSimpleTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for simple table with sort temp compressed with gzip" +
+       " and off-heap sort disabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "gzip")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    testSimpleTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
   private def testComplexTable(): Unit = {
     // note: following tests are copied from `TestComplexTypeQuery`
     sql(