You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2020/08/11 22:38:56 UTC
[orc] branch branch-1.6 updated: ORC-639: Improve zstd performance
(#537)
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new c44a02e ORC-639: Improve zstd performance (#537)
c44a02e is described below
commit c44a02ee0c6708a166ddb91f8d4581091a69d5f6
Author: Ion GaztaƱaga <ig...@gmail.com>
AuthorDate: Wed Aug 12 00:37:34 2020 +0200
ORC-639: Improve zstd performance (#537)
### What changes were proposed in this pull request?
This PR aims to improve ZSTD performance in branch-1.6. This is a backport of #511
### Why are the changes needed?
Zstd's "Simple API" is used which initializes the compression/decompression context each time. This includes time to initialize all compression/decompression tables. Using the "Explicit context" API improves the performance significantly as compression/decompression tables are constructed once and reused each time "decompress" or "doBlockCompression" is called. We've noticed more than 15% time improvement in some applications with this change, so it seems that any ORC user using Zstd [...]
### How was this patch tested?
Pass the UT
---
c++/src/Compression.cc | 86 +++++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 75 insertions(+), 11 deletions(-)
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 041abbb..4278ed7 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -1031,12 +1031,16 @@ DIAGNOSTIC_POP
capacity,
blockSize,
pool) {
- // PASS
+ this->init();
}
virtual std::string getName() const override {
return "ZstdCompressionStream";
}
+
+ virtual ~ZSTDCompressionStream() override {
+ this->end();
+ }
protected:
virtual uint64_t doBlockCompression() override;
@@ -1044,15 +1048,43 @@ DIAGNOSTIC_POP
virtual uint64_t estimateMaxCompressionSize() override {
return ZSTD_compressBound(static_cast<size_t>(bufferSize));
}
+
+ private:
+ void init();
+ void end();
+ ZSTD_CCtx *cctx;
};
uint64_t ZSTDCompressionStream::doBlockCompression() {
- return ZSTD_compress(compressorBuffer.data(),
- compressorBuffer.size(),
- rawInputBuffer.data(),
- static_cast<size_t>(bufferSize),
- level);
+ return ZSTD_compressCCtx(cctx,
+ compressorBuffer.data(),
+ compressorBuffer.size(),
+ rawInputBuffer.data(),
+ static_cast<size_t>(bufferSize),
+ level);
}
+
+DIAGNOSTIC_PUSH
+
+#if defined(__GNUC__) || defined(__clang__)
+ DIAGNOSTIC_IGNORE("-Wold-style-cast")
+#endif
+
+ void ZSTDCompressionStream::init() {
+
+ cctx = ZSTD_createCCtx();
+ if (!cctx) {
+ throw std::runtime_error("Error while calling ZSTD_createCCtx() for zstd.");
+ }
+ }
+
+
+ void ZSTDCompressionStream::end() {
+ (void)ZSTD_freeCCtx(cctx);
+ cctx = nullptr;
+ }
+
+DIAGNOSTIC_PUSH
/**
* ZSTD block decompression
@@ -1065,7 +1097,11 @@ DIAGNOSTIC_POP
: BlockDecompressionStream(std::move(inStream),
blockSize,
pool) {
- // PASS
+ this->init();
+ }
+
+ virtual ~ZSTDDecompressionStream() override {
+ this->end();
}
std::string getName() const override {
@@ -1079,18 +1115,46 @@ DIAGNOSTIC_POP
uint64_t length,
char *output,
size_t maxOutputLength) override;
+
+ private:
+ void init();
+ void end();
+ ZSTD_DCtx *dctx;
};
uint64_t ZSTDDecompressionStream::decompress(const char *input,
uint64_t length,
char *output,
size_t maxOutputLength) {
- return static_cast<uint64_t>(ZSTD_decompress(output,
- maxOutputLength,
- input,
- length));
+ return static_cast<uint64_t>(ZSTD_decompressDCtx(dctx,
+ output,
+ maxOutputLength,
+ input,
+ length));
+ }
+
+DIAGNOSTIC_PUSH
+
+#if defined(__GNUC__) || defined(__clang__)
+ DIAGNOSTIC_IGNORE("-Wold-style-cast")
+#endif
+
+ void ZSTDDecompressionStream::init() {
+
+ dctx = ZSTD_createDCtx();
+ if (!dctx) {
+ throw std::runtime_error("Error while calling ZSTD_createDCtx() for zstd.");
+ }
}
+
+ void ZSTDDecompressionStream::end() {
+ (void)ZSTD_freeDCtx(dctx);
+ dctx = nullptr;
+ }
+
+DIAGNOSTIC_PUSH
+
std::unique_ptr<BufferedOutputStream>
createCompressor(
CompressionKind kind,