You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:33 UTC

[32/50] [abbrv] carbondata git commit: [CARBONDATA-2553] support ZSTD compression for sort temp file

[CARBONDATA-2553] support ZSTD compression for sort temp file

This closes #2350


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ece06729
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ece06729
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ece06729

Branch: refs/heads/carbonstore
Commit: ece0672930b8bffba8e9bddad63560ff9d6cd582
Parents: 5593d16
Author: Manhua <ke...@qq.com>
Authored: Tue May 29 09:21:52 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Jun 18 21:31:02 2018 +0800

----------------------------------------------------------------------
 core/pom.xml                                    |  5 ++
 .../core/constants/CarbonCommonConstants.java   |  2 +-
 .../datastore/filesystem/LocalCarbonFile.java   |  8 +++
 .../carbondata/core/util/CarbonProperties.java  |  4 +-
 docs/useful-tips-on-carbondata.md               |  2 +-
 .../TestLoadWithSortTempCompressed.scala        | 51 +++++++++++++++++++-
 6 files changed, 66 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 7d87037..c145c3b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -68,6 +68,11 @@
       <version>${snappy.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.github.luben</groupId>
+      <artifactId>zstd-jni</artifactId>
+      <version>1.3.2-2</version>
+    </dependency>
+    <dependency>
       <groupId>org.jmockit</groupId>
       <artifactId>jmockit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 2fcf0f5..355bcb6 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1378,7 +1378,7 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SORT_TEMP_COMPRESSOR = "carbon.sort.temp.compressor";
 
   /**
-   * The optional values are 'SNAPPY','GZIP','BZIP2','LZ4'.
+   * The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD'.
    * By default, empty means that Carbondata will not compress the sort temp files.
    */
   public static final String CARBON_SORT_TEMP_COMPRESSOR_DEFAULT = "";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 60b7e17..5b6f657 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -42,6 +42,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import com.github.luben.zstd.ZstdInputStream;
+import com.github.luben.zstd.ZstdOutputStream;
 import net.jpountz.lz4.LZ4BlockInputStream;
 import net.jpountz.lz4.LZ4BlockOutputStream;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
@@ -290,6 +292,8 @@ public class LocalCarbonFile implements CarbonFile {
       inputStream = new SnappyInputStream(new FileInputStream(path));
     } else if ("LZ4".equalsIgnoreCase(compressor)) {
       inputStream = new LZ4BlockInputStream(new FileInputStream(path));
+    } else if ("ZSTD".equalsIgnoreCase(compressor)) {
+      inputStream = new ZstdInputStream(new FileInputStream(path));
     } else {
       throw new IOException("Unsupported compressor: " + compressor);
     }
@@ -368,6 +372,10 @@ public class LocalCarbonFile implements CarbonFile {
       outputStream = new SnappyOutputStream(new FileOutputStream(path));
     } else if ("LZ4".equalsIgnoreCase(compressor)) {
       outputStream = new LZ4BlockOutputStream(new FileOutputStream(path));
+    } else if ("ZSTD".equalsIgnoreCase(compressor)) {
+      // compression level 1 is cost-effective for sort temp file
+      // which is not used for storage
+      outputStream = new ZstdOutputStream(new FileOutputStream(path), 1);
     } else {
       throw new IOException("Unsupported compressor: " + compressor);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index b134a7c..dc50ab0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1290,11 +1290,11 @@ public final class CarbonProperties {
     String compressor = getProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
         CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT).toUpperCase();
     if (compressor.isEmpty() || "SNAPPY".equals(compressor) || "GZIP".equals(compressor)
-        || "BZIP2".equals(compressor) || "LZ4".equals(compressor)) {
+        || "BZIP2".equals(compressor) || "LZ4".equals(compressor) || "ZSTD".equals(compressor)) {
       return compressor;
     } else {
       LOGGER.warn("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
-          .concat(" configuration value is invalid. Only snappy,gzip,bip2,lz4 and")
+          .concat(" configuration value is invalid. Only snappy, gzip, bip2, lz4, zstd and")
           .concat(" empty are allowed. It will not compress the sort temp files by default"));
       return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/docs/useful-tips-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/useful-tips-on-carbondata.md b/docs/useful-tips-on-carbondata.md
index 732d38f..d00f785 100644
--- a/docs/useful-tips-on-carbondata.md
+++ b/docs/useful-tips-on-carbondata.md
@@ -168,7 +168,7 @@
   | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Data loading | The buffer size to store records, returned from the block scan. | In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. |
   | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN local directories for multi-table load disk load balance | If this is set it to true CarbonData will use YARN local directories for multi-table load disk load balance, that will improve the data load performance. |
   | carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data loading | Whether to use multiple YARN local directories during table data loading for disk load balance | After enabling 'carbon.use.local.dir', if this is set to true, CarbonData will use all YARN local directories during data load for disk load balance, that will improve the data load performance. Please enable this property when you encounter disk hotspot problem during data loading. |
-  | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4' and empty. By default, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. |
+  | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. |
   | carbon.load.skewedDataOptimization.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable size based block allocation strategy for data loading. | When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data -- It's useful if the size of your input data files varies widely, say 1MB~1GB. |
   | carbon.load.min.size.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable node minumun input data size allocation strategy for data loading.| When loading, carbondata will use node minumun input data size allocation strategy for task distribution. It will make sure the node load the minimum amount of data -- It's useful if the size of your input data files very small, say 1MB~256MB,Avoid generating a large number of small files. |
   

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ece06729/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
index 61acea4..5fbdd14 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
@@ -50,9 +50,8 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
 
   override protected def beforeAll(): Unit = {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
-      "SNAPPY")
   }
+
   override def afterAll(): Unit = {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
@@ -84,6 +83,8 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
   test("test data load for simple table with sort temp compressed with snappy" +
        " and off-heap sort enabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
     testSimpleTable()
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
@@ -92,6 +93,28 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
   test("test data load for simple table with sort temp compressed with snappy" +
        " and off-heap sort disabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    testSimpleTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for simple table with sort temp compressed with zstd" +
+       " and off-heap sort enabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "ZSTD")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    testSimpleTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for simple table with sort temp compressed with zstd" +
+       " and off-heap sort disabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "ZSTD")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
     testSimpleTable()
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
@@ -138,6 +161,8 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
   test("test data load for complex table with sort temp compressed with snappy" +
        " and off-heap sort enabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
     testComplexTable()
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
@@ -146,6 +171,28 @@ class TestLoadWithSortTempCompressed extends QueryTest
 
   test("test data load for complex table with sort temp compressed with snappy" +
        " and off-heap sort disabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    testComplexTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for complex table with sort temp compressed with zstd" +
+       " and off-heap sort enabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "ZSTD")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    testComplexTable()
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for complex table with sort temp compressed with zstd" +
+       " and off-heap sort disabled") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "ZSTD")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
     testComplexTable()
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,