You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2020/08/11 22:38:56 UTC

[orc] branch branch-1.6 updated: ORC-639: Improve zstd performance (#537)

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new c44a02e  ORC-639: Improve zstd performance (#537)
c44a02e is described below

commit c44a02ee0c6708a166ddb91f8d4581091a69d5f6
Author: Ion GaztaƱaga <ig...@gmail.com>
AuthorDate: Wed Aug 12 00:37:34 2020 +0200

    ORC-639: Improve zstd performance (#537)
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve ZSTD performance in branch-1.6. This is a backport of #511
    
    ### Why are the changes needed?
    
    Zstd's "Simple API" is used which initializes the compression/decompression context each time. This includes time to initialize all compression/decompression tables. Using the "Explicit context" API improves the performance significantly as compression/decompression tables are constructed once and reused each time "decompress" or "doBlockCompression" is called. We've noticed more than 15% time improvement in some applications with this change, so it seems that any ORC user using Zstd  [...]
    
    ### How was this patch tested?
    
    Pass the UT
---
 c++/src/Compression.cc | 86 +++++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 75 insertions(+), 11 deletions(-)

diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 041abbb..4278ed7 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -1031,12 +1031,16 @@ DIAGNOSTIC_POP
                                                    capacity,
                                                    blockSize,
                                                    pool) {
-      // PASS
+      this->init();
     }
 
     virtual std::string getName() const override {
       return "ZstdCompressionStream";
     }
+    
+    virtual ~ZSTDCompressionStream() override {
+      this->end();
+    }
 
   protected:
     virtual uint64_t doBlockCompression() override;
@@ -1044,15 +1048,43 @@ DIAGNOSTIC_POP
     virtual uint64_t estimateMaxCompressionSize() override {
       return ZSTD_compressBound(static_cast<size_t>(bufferSize));
     }
+    
+  private:
+    void init();
+    void end();
+    ZSTD_CCtx *cctx;
   };
 
   uint64_t ZSTDCompressionStream::doBlockCompression() {
-    return ZSTD_compress(compressorBuffer.data(),
-                         compressorBuffer.size(),
-                         rawInputBuffer.data(),
-                         static_cast<size_t>(bufferSize),
-                         level);
+    return ZSTD_compressCCtx(cctx,
+                             compressorBuffer.data(),
+                             compressorBuffer.size(),
+                             rawInputBuffer.data(),
+                             static_cast<size_t>(bufferSize),
+                             level);
   }
+  
+DIAGNOSTIC_PUSH
+
+#if defined(__GNUC__) || defined(__clang__)
+  DIAGNOSTIC_IGNORE("-Wold-style-cast")
+#endif
+
+  void ZSTDCompressionStream::init() {
+
+    cctx = ZSTD_createCCtx();
+    if (!cctx) {
+      throw std::runtime_error("Error while calling ZSTD_createCCtx() for zstd.");
+    }
+  }
+
+
+  void ZSTDCompressionStream::end() {
+    (void)ZSTD_freeCCtx(cctx);
+    cctx = nullptr;
+  }
+
+DIAGNOSTIC_PUSH
 
   /**
    * ZSTD block decompression
@@ -1065,7 +1097,11 @@ DIAGNOSTIC_POP
                             : BlockDecompressionStream(std::move(inStream),
                                                        blockSize,
                                                        pool) {
-      // PASS
+      this->init();
+    }
+
+    virtual ~ZSTDDecompressionStream() override {
+      this->end();
     }
 
     std::string getName() const override {
@@ -1079,18 +1115,46 @@ DIAGNOSTIC_POP
                                 uint64_t length,
                                 char *output,
                                 size_t maxOutputLength) override;
+
+  private:
+    void init();
+    void end();
+    ZSTD_DCtx *dctx;
   };
 
   uint64_t ZSTDDecompressionStream::decompress(const char *input,
                                                uint64_t length,
                                                char *output,
                                                size_t maxOutputLength) {
-    return static_cast<uint64_t>(ZSTD_decompress(output,
-                                                 maxOutputLength,
-                                                 input,
-                                                 length));
+    return static_cast<uint64_t>(ZSTD_decompressDCtx(dctx,
+                                                     output,
+                                                     maxOutputLength,
+                                                     input,
+                                                     length));
+  }
+
+DIAGNOSTIC_PUSH
+
+#if defined(__GNUC__) || defined(__clang__)
+  DIAGNOSTIC_IGNORE("-Wold-style-cast")
+#endif
+
+  void ZSTDDecompressionStream::init() {
+
+    dctx = ZSTD_createDCtx();
+    if (!dctx) {
+      throw std::runtime_error("Error while calling ZSTD_createDCtx() for zstd.");
+    }
   }
 
+
+  void ZSTDDecompressionStream::end() {
+    (void)ZSTD_freeDCtx(dctx);
+    dctx = nullptr;
+  }
+
+DIAGNOSTIC_PUSH
+
   std::unique_ptr<BufferedOutputStream>
      createCompressor(
                       CompressionKind kind,