You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/11 05:58:00 UTC
arrow git commit: ARROW-1186: [C++] Add support to build only Parquet
dependencies
Repository: arrow
Updated Branches:
refs/heads/master 845207118 -> dbedc8d20
ARROW-1186: [C++] Add support to build only Parquet dependencies
Author: Deepak Majeti <de...@hpe.com>
Closes #815 from majetideepak/ARROW-1186 and squashes the following commits:
1d3ac224 [Deepak Majeti] fix srcs and libs
ed7db98b [Deepak Majeti] Remove extra headers
f1761920 [Deepak Majeti] Review comments
46a52dd2 [Deepak Majeti] disable minimal parquet build if tests and benchmarks are built
1640f209 [Deepak Majeti] Add support to build only Parquet dependencies
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/dbedc8d2
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/dbedc8d2
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/dbedc8d2
Branch: refs/heads/master
Commit: dbedc8d20d9936076cbf47d8ef1ef893b8e05f45
Parents: 8452071
Author: Deepak Majeti <de...@hpe.com>
Authored: Tue Jul 11 01:57:55 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Jul 11 01:57:55 2017 -0400
----------------------------------------------------------------------
cpp/CMakeLists.txt | 36 ++-
cpp/cmake_modules/ThirdpartyToolchain.cmake | 132 ++++-----
cpp/src/arrow/util/compression.cc | 333 -----------------------
cpp/src/arrow/util/compression_brotli.cc | 63 +++++
cpp/src/arrow/util/compression_lz4.cc | 57 ++++
cpp/src/arrow/util/compression_snappy.cc | 63 +++++
cpp/src/arrow/util/compression_zlib.cc | 245 +++++++++++++++++
cpp/src/arrow/util/compression_zstd.cc | 59 ++++
8 files changed, 588 insertions(+), 400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/dbedc8d2/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index e67c7f6..77f1e59 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -142,6 +142,14 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
"Build with SSE4 optimizations"
OFF)
+ option(ARROW_WITH_LZ4
+ "Build with lz4 compression"
+ ON)
+
+ option(ARROW_WITH_ZSTD
+ "Build with zstd compression"
+ ON)
+
option(ARROW_ZLIB_VENDORED
"Build our own zlib (some libz.a aren't configured for static linking)"
ON)
@@ -161,12 +169,17 @@ endif()
if(ARROW_BUILD_TESTS)
set(ARROW_BUILD_STATIC ON)
+ set(ARROW_WITH_LZ4 ON)
+ set(ARROW_WITH_ZSTD ON)
else()
set(NO_TESTS 1)
endif()
if(NOT ARROW_BUILD_BENCHMARKS)
set(NO_BENCHMARKS 1)
+else()
+ set(ARROW_WITH_LZ4 ON)
+ set(ARROW_WITH_ZSTD ON)
endif()
if(ARROW_HDFS)
@@ -536,9 +549,15 @@ set(ARROW_STATIC_LINK_LIBS
brotli_enc
brotli_common
snappy
- zlib
- zstd_static
- lz4_static)
+ zlib)
+
+if (ARROW_WITH_LZ4)
+ SET(ARROW_STATIC_LINK_LIBS lz4_static ${ARROW_STATIC_LINK_LIBS})
+endif()
+
+if (ARROW_WITH_ZSTD)
+ SET(ARROW_STATIC_LINK_LIBS zstd_static ${ARROW_STATIC_LINK_LIBS})
+endif()
add_dependencies(arrow_dependencies ${ARROW_STATIC_LINK_LIBS})
@@ -664,11 +683,22 @@ set(ARROW_SRCS
src/arrow/util/bit-util.cc
src/arrow/util/compression.cc
+ src/arrow/util/compression_brotli.cc
+ src/arrow/util/compression_snappy.cc
+ src/arrow/util/compression_zlib.cc
src/arrow/util/cpu-info.cc
src/arrow/util/decimal.cc
src/arrow/util/key_value_metadata.cc
)
+if (ARROW_WITH_LZ4)
+ SET(ARROW_SRCS src/arrow/util/compression_lz4.cc ${ARROW_SRCS})
+endif()
+
+if (ARROW_WITH_ZSTD)
+ SET(ARROW_SRCS src/arrow/util/compression_zstd.cc ${ARROW_SRCS})
+endif()
+
if (NOT ARROW_BOOST_HEADER_ONLY)
set(ARROW_SRCS ${ARROW_SRCS}
src/arrow/io/hdfs.cc
http://git-wip-us.apache.org/repos/asf/arrow/blob/dbedc8d2/cpp/cmake_modules/ThirdpartyToolchain.cmake
----------------------------------------------------------------------
diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
index 74939ac..ab8fc5d 100644
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
@@ -615,82 +615,86 @@ if (BROTLI_VENDORED)
add_dependencies(brotli_common brotli_ep)
endif()
+if (ARROW_WITH_LZ4)
# ----------------------------------------------------------------------
# Lz4
-find_package(Lz4)
-if (NOT LZ4_FOUND)
- set(LZ4_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/lz4_ep-prefix/src/lz4_ep")
- set(LZ4_INCLUDE_DIR "${LZ4_BUILD_DIR}/lib")
-
- if (MSVC)
- set(LZ4_STATIC_LIB "${LZ4_BUILD_DIR}/visual/VS2010/bin/x64_${CMAKE_BUILD_TYPE}/liblz4_static.lib")
- set(LZ4_BUILD_COMMAND BUILD_COMMAND msbuild.exe /m /p:Configuration=${CMAKE_BUILD_TYPE} /p:Platform=x64 /p:PlatformToolset=v140 /t:Build ${LZ4_BUILD_DIR}/visual/VS2010/lz4.sln)
+ find_package(Lz4)
+ if (NOT LZ4_FOUND)
+ set(LZ4_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/lz4_ep-prefix/src/lz4_ep")
+ set(LZ4_INCLUDE_DIR "${LZ4_BUILD_DIR}/lib")
+
+ if (MSVC)
+ set(LZ4_STATIC_LIB "${LZ4_BUILD_DIR}/visual/VS2010/bin/x64_${CMAKE_BUILD_TYPE}/liblz4_static.lib")
+ set(LZ4_BUILD_COMMAND BUILD_COMMAND msbuild.exe /m /p:Configuration=${CMAKE_BUILD_TYPE} /p:Platform=x64 /p:PlatformToolset=v140 /t:Build ${LZ4_BUILD_DIR}/visual/VS2010/lz4.sln)
+ else()
+ set(LZ4_STATIC_LIB "${LZ4_BUILD_DIR}/lib/liblz4.a")
+ set(LZ4_BUILD_COMMAND BUILD_COMMAND ${CMAKE_SOURCE_DIR}/build-support/build-lz4-lib.sh)
+ endif()
+
+ ExternalProject_Add(lz4_ep
+ URL "https://github.com/lz4/lz4/archive/v${LZ4_VERSION}.tar.gz"
+ UPDATE_COMMAND ""
+ PATCH_COMMAND ""
+ CONFIGURE_COMMAND ""
+ INSTALL_COMMAND ""
+ BINARY_DIR ${LZ4_BUILD_DIR}
+ BUILD_BYPRODUCTS ${LZ4_STATIC_LIB}
+ ${LZ4_BUILD_COMMAND}
+ )
+
+ set(LZ4_VENDORED 1)
else()
- set(LZ4_STATIC_LIB "${LZ4_BUILD_DIR}/lib/liblz4.a")
- set(LZ4_BUILD_COMMAND BUILD_COMMAND ${CMAKE_SOURCE_DIR}/build-support/build-lz4-lib.sh)
+ set(LZ4_VENDORED 0)
+ endif()
+
+ include_directories(SYSTEM ${LZ4_INCLUDE_DIR})
+ ADD_THIRDPARTY_LIB(lz4_static
+ STATIC_LIB ${LZ4_STATIC_LIB})
+
+ if (LZ4_VENDORED)
+ add_dependencies(lz4_static lz4_ep)
endif()
-
- ExternalProject_Add(lz4_ep
- URL "https://github.com/lz4/lz4/archive/v${LZ4_VERSION}.tar.gz"
- UPDATE_COMMAND ""
- PATCH_COMMAND ""
- CONFIGURE_COMMAND ""
- INSTALL_COMMAND ""
- BINARY_DIR ${LZ4_BUILD_DIR}
- BUILD_BYPRODUCTS ${LZ4_STATIC_LIB}
- ${LZ4_BUILD_COMMAND}
- )
-
- set(LZ4_VENDORED 1)
-else()
- set(LZ4_VENDORED 0)
-endif()
-
-include_directories(SYSTEM ${LZ4_INCLUDE_DIR})
-ADD_THIRDPARTY_LIB(lz4_static
- STATIC_LIB ${LZ4_STATIC_LIB})
-
-if (LZ4_VENDORED)
- add_dependencies(lz4_static lz4_ep)
endif()
-
+
+if (ARROW_WITH_ZSTD)
# ----------------------------------------------------------------------
# ZSTD
-find_package(ZSTD)
-if (NOT ZSTD_FOUND)
- set(ZSTD_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/zstd_ep-prefix/src/zstd_ep")
- set(ZSTD_INCLUDE_DIR "${ZSTD_BUILD_DIR}/lib")
+ find_package(ZSTD)
+ if (NOT ZSTD_FOUND)
+ set(ZSTD_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/zstd_ep-prefix/src/zstd_ep")
+ set(ZSTD_INCLUDE_DIR "${ZSTD_BUILD_DIR}/lib")
- if (MSVC)
- set(ZSTD_STATIC_LIB "${ZSTD_BUILD_DIR}/build/VS2010/bin/x64_${CMAKE_BUILD_TYPE}/libzstd_static.lib")
- set(ZSTD_BUILD_COMMAND BUILD_COMMAND msbuild ${ZSTD_BUILD_DIR}/build/VS2010/zstd.sln /t:Build /v:minimal /p:Configuration=${CMAKE_BUILD_TYPE} /p:Platform=x64 /p:PlatformToolset=v140 /p:OutDir=${ZSTD_BUILD_DIR}/build/VS2010/bin/x64_${CMAKE_BUILD_TYPE}/ /p:SolutionDir=${ZSTD_BUILD_DIR}/build/VS2010/ )
+ if (MSVC)
+ set(ZSTD_STATIC_LIB "${ZSTD_BUILD_DIR}/build/VS2010/bin/x64_${CMAKE_BUILD_TYPE}/libzstd_static.lib")
+ set(ZSTD_BUILD_COMMAND BUILD_COMMAND msbuild ${ZSTD_BUILD_DIR}/build/VS2010/zstd.sln /t:Build /v:minimal /p:Configuration=${CMAKE_BUILD_TYPE} /p:Platform=x64 /p:PlatformToolset=v140 /p:OutDir=${ZSTD_BUILD_DIR}/build/VS2010/bin/x64_${CMAKE_BUILD_TYPE}/ /p:SolutionDir=${ZSTD_BUILD_DIR}/build/VS2010/ )
+ else()
+ set(ZSTD_STATIC_LIB "${ZSTD_BUILD_DIR}/lib/libzstd.a")
+ set(ZSTD_BUILD_COMMAND BUILD_COMMAND ${CMAKE_SOURCE_DIR}/build-support/build-zstd-lib.sh)
+ endif()
+
+ ExternalProject_Add(zstd_ep
+ URL "https://github.com/facebook/zstd/archive/v${ZSTD_VERSION}.tar.gz"
+ UPDATE_COMMAND ""
+ PATCH_COMMAND ""
+ CONFIGURE_COMMAND ""
+ INSTALL_COMMAND ""
+ BINARY_DIR ${ZSTD_BUILD_DIR}
+ BUILD_BYPRODUCTS ${ZSTD_STATIC_LIB}
+ ${ZSTD_BUILD_COMMAND}
+ )
+
+ set(ZSTD_VENDORED 1)
else()
- set(ZSTD_STATIC_LIB "${ZSTD_BUILD_DIR}/lib/libzstd.a")
- set(ZSTD_BUILD_COMMAND BUILD_COMMAND ${CMAKE_SOURCE_DIR}/build-support/build-zstd-lib.sh)
+ set(ZSTD_VENDORED 0)
endif()
- ExternalProject_Add(zstd_ep
- URL "https://github.com/facebook/zstd/archive/v${ZSTD_VERSION}.tar.gz"
- UPDATE_COMMAND ""
- PATCH_COMMAND ""
- CONFIGURE_COMMAND ""
- INSTALL_COMMAND ""
- BINARY_DIR ${ZSTD_BUILD_DIR}
- BUILD_BYPRODUCTS ${ZSTD_STATIC_LIB}
- ${ZSTD_BUILD_COMMAND}
- )
-
- set(ZSTD_VENDORED 1)
-else()
- set(ZSTD_VENDORED 0)
-endif()
-
-include_directories(SYSTEM ${ZSTD_INCLUDE_DIR})
-ADD_THIRDPARTY_LIB(zstd_static
- STATIC_LIB ${ZSTD_STATIC_LIB})
+ include_directories(SYSTEM ${ZSTD_INCLUDE_DIR})
+ ADD_THIRDPARTY_LIB(zstd_static
+ STATIC_LIB ${ZSTD_STATIC_LIB})
-if (ZSTD_VENDORED)
- add_dependencies(zstd_static zstd_ep)
+ if (ZSTD_VENDORED)
+ add_dependencies(zstd_static zstd_ep)
+ endif()
endif()
http://git-wip-us.apache.org/repos/asf/arrow/blob/dbedc8d2/cpp/src/arrow/util/compression.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc
index 0f17e7c..4681488 100644
--- a/cpp/src/arrow/util/compression.cc
+++ b/cpp/src/arrow/util/compression.cc
@@ -17,23 +17,11 @@
#include "arrow/util/compression.h"
-// Work around warning caused by Snappy include
-#ifdef DISALLOW_COPY_AND_ASSIGN
-#undef DISALLOW_COPY_AND_ASSIGN
-#endif
-
#include <cstdint>
#include <memory>
#include <sstream>
#include <string>
-#include <brotli/decode.h>
-#include <brotli/encode.h>
-#include <lz4.h>
-#include <snappy.h>
-#include <zlib.h>
-#include <zstd.h>
-
#include "arrow/status.h"
#include "arrow/util/logging.h"
@@ -62,325 +50,4 @@ Status Codec::Create(Compression::type codec_type, std::unique_ptr<Codec>* resul
return Status::OK();
}
-// ----------------------------------------------------------------------
-// gzip implementation
-
-// These are magic numbers from zlib.h. Not clear why they are not defined
-// there.
-
-// Maximum window size
-static constexpr int WINDOW_BITS = 15;
-
-// Output Gzip.
-static constexpr int GZIP_CODEC = 16;
-
-// Determine if this is libz or gzip from header.
-static constexpr int DETECT_CODEC = 32;
-
-class GZipCodec::GZipCodecImpl {
- public:
- explicit GZipCodecImpl(GZipCodec::Format format)
- : format_(format),
- compressor_initialized_(false),
- decompressor_initialized_(false) {}
-
- ~GZipCodecImpl() {
- EndCompressor();
- EndDecompressor();
- }
-
- Status InitCompressor() {
- EndDecompressor();
- memset(&stream_, 0, sizeof(stream_));
-
- int ret;
- // Initialize to run specified format
- int window_bits = WINDOW_BITS;
- if (format_ == DEFLATE) {
- window_bits = -window_bits;
- } else if (format_ == GZIP) {
- window_bits += GZIP_CODEC;
- }
- if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 9,
- Z_DEFAULT_STRATEGY)) != Z_OK) {
- std::stringstream ss;
- ss << "zlib deflateInit failed: " << std::string(stream_.msg);
- return Status::IOError(ss.str());
- }
- compressor_initialized_ = true;
- return Status::OK();
- }
-
- void EndCompressor() {
- if (compressor_initialized_) { (void)deflateEnd(&stream_); }
- compressor_initialized_ = false;
- }
-
- Status InitDecompressor() {
- EndCompressor();
- memset(&stream_, 0, sizeof(stream_));
- int ret;
-
- // Initialize to run either deflate or zlib/gzip format
- int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
- if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
- std::stringstream ss;
- ss << "zlib inflateInit failed: " << std::string(stream_.msg);
- return Status::IOError(ss.str());
- }
- decompressor_initialized_ = true;
- return Status::OK();
- }
-
- void EndDecompressor() {
- if (decompressor_initialized_) { (void)inflateEnd(&stream_); }
- decompressor_initialized_ = false;
- }
-
- Status Decompress(int64_t input_length, const uint8_t* input, int64_t output_length,
- uint8_t* output) {
- if (!decompressor_initialized_) { RETURN_NOT_OK(InitDecompressor()); }
- if (output_length == 0) {
- // The zlib library does not allow *output to be NULL, even when output_length
- // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an
- // error, so bail early if no output is expected. Note that we don't signal
- // an error if the input actually contains compressed data.
- return Status::OK();
- }
-
- // Reset the stream for this block
- if (inflateReset(&stream_) != Z_OK) {
- std::stringstream ss;
- ss << "zlib inflateReset failed: " << std::string(stream_.msg);
- return Status::IOError(ss.str());
- }
-
- int ret = 0;
- // gzip can run in streaming mode or non-streaming mode. We only
- // support the non-streaming use case where we present it the entire
- // compressed input and a buffer big enough to contain the entire
- // compressed output. In the case where we don't know the output,
- // we just make a bigger buffer and try the non-streaming mode
- // from the beginning again.
- while (ret != Z_STREAM_END) {
- stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
- stream_.avail_in = static_cast<uInt>(input_length);
- stream_.next_out = reinterpret_cast<Bytef*>(output);
- stream_.avail_out = static_cast<uInt>(output_length);
-
- // We know the output size. In this case, we can use Z_FINISH
- // which is more efficient.
- ret = inflate(&stream_, Z_FINISH);
- if (ret == Z_STREAM_END || ret != Z_OK) break;
-
- // Failure, buffer was too small
- std::stringstream ss;
- ss << "Too small a buffer passed to GZipCodec. InputLength=" << input_length
- << " OutputLength=" << output_length;
- return Status::IOError(ss.str());
- }
-
- // Failure for some other reason
- if (ret != Z_STREAM_END) {
- std::stringstream ss;
- ss << "GZipCodec failed: ";
- if (stream_.msg != NULL) ss << stream_.msg;
- return Status::IOError(ss.str());
- }
- return Status::OK();
- }
-
- int64_t MaxCompressedLen(int64_t input_length, const uint8_t* input) {
- // Most be in compression mode
- if (!compressor_initialized_) {
- Status s = InitCompressor();
- DCHECK(s.ok());
- }
- // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase)
- return deflateBound(&stream_, static_cast<uLong>(input_length));
- }
-
- Status Compress(int64_t input_length, const uint8_t* input, int64_t output_buffer_len,
- uint8_t* output, int64_t* output_length) {
- if (!compressor_initialized_) { RETURN_NOT_OK(InitCompressor()); }
- stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
- stream_.avail_in = static_cast<uInt>(input_length);
- stream_.next_out = reinterpret_cast<Bytef*>(output);
- stream_.avail_out = static_cast<uInt>(output_buffer_len);
-
- int64_t ret = 0;
- if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
- if (ret == Z_OK) {
- // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
- // small
- return Status::IOError("zlib deflate failed, output buffer too small");
- }
- std::stringstream ss;
- ss << "zlib deflate failed: " << stream_.msg;
- return Status::IOError(ss.str());
- }
-
- if (deflateReset(&stream_) != Z_OK) {
- std::stringstream ss;
- ss << "zlib deflateReset failed: " << std::string(stream_.msg);
- return Status::IOError(ss.str());
- }
-
- // Actual output length
- *output_length = output_buffer_len - stream_.avail_out;
- return Status::OK();
- }
-
- private:
- // zlib is stateful and the z_stream state variable must be initialized
- // before
- z_stream stream_;
-
- // Realistically, this will always be GZIP, but we leave the option open to
- // configure
- GZipCodec::Format format_;
-
- // These variables are mutually exclusive. When the codec is in "compressor"
- // state, compressor_initialized_ is true while decompressor_initialized_ is
- // false. When it's decompressing, the opposite is true.
- //
- // Indeed, this is slightly hacky, but the alternative is having separate
- // Compressor and Decompressor classes. If this ever becomes an issue, we can
- // perform the refactoring then
- bool compressor_initialized_;
- bool decompressor_initialized_;
-};
-
-GZipCodec::GZipCodec(Format format) {
- impl_.reset(new GZipCodecImpl(format));
-}
-
-GZipCodec::~GZipCodec() {}
-
-Status GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
- int64_t output_buffer_len, uint8_t* output) {
- return impl_->Decompress(input_length, input, output_buffer_len, output);
-}
-
-int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) {
- return impl_->MaxCompressedLen(input_length, input);
-}
-
-Status GZipCodec::Compress(int64_t input_length, const uint8_t* input,
- int64_t output_buffer_len, uint8_t* output, int64_t* output_length) {
- return impl_->Compress(input_length, input, output_buffer_len, output, output_length);
-}
-
-const char* GZipCodec::name() const {
- return "gzip";
-}
-
-// ----------------------------------------------------------------------
-// Snappy implementation
-
-Status SnappyCodec::Decompress(
- int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
- if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
- static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) {
- return Status::IOError("Corrupt snappy compressed data.");
- }
- return Status::OK();
-}
-
-int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
- return snappy::MaxCompressedLength(input_len);
-}
-
-Status SnappyCodec::Compress(int64_t input_len, const uint8_t* input,
- int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_length) {
- size_t output_len;
- snappy::RawCompress(reinterpret_cast<const char*>(input),
- static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer),
- &output_len);
- *output_length = static_cast<int64_t>(output_len);
- return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Brotli implementation
-
-Status BrotliCodec::Decompress(
- int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
- size_t output_size = output_len;
- if (BrotliDecoderDecompress(input_len, input, &output_size, output_buffer) !=
- BROTLI_DECODER_RESULT_SUCCESS) {
- return Status::IOError("Corrupt brotli compressed data.");
- }
- return Status::OK();
-}
-
-int64_t BrotliCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
- return BrotliEncoderMaxCompressedSize(input_len);
-}
-
-Status BrotliCodec::Compress(int64_t input_len, const uint8_t* input,
- int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_length) {
- size_t output_len = output_buffer_len;
- // TODO: Make quality configurable. We use 8 as a default as it is the best
- // trade-off for Parquet workload
- if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_len,
- input, &output_len, output_buffer) == BROTLI_FALSE) {
- return Status::IOError("Brotli compression failure.");
- }
- *output_length = output_len;
- return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// ZSTD implementation
-
-Status ZSTDCodec::Decompress(
- int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
- int64_t decompressed_size = ZSTD_decompress(output_buffer,
- static_cast<size_t>(output_len), input, static_cast<size_t>(input_len));
- if (decompressed_size != output_len) {
- return Status::IOError("Corrupt ZSTD compressed data.");
- }
- return Status::OK();
-}
-
-int64_t ZSTDCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
- return ZSTD_compressBound(input_len);
-}
-
-Status ZSTDCodec::Compress(int64_t input_len, const uint8_t* input,
- int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_length) {
- *output_length = ZSTD_compress(output_buffer, static_cast<size_t>(output_buffer_len),
- input, static_cast<size_t>(input_len), 1);
- if (ZSTD_isError(*output_length)) {
- return Status::IOError("ZSTD compression failure.");
- }
- return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Lz4 implementation
-
-Status Lz4Codec::Decompress(
- int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
- int64_t decompressed_size = LZ4_decompress_safe(reinterpret_cast<const char*>(input),
- reinterpret_cast<char*>(output_buffer), static_cast<int>(input_len),
- static_cast<int>(output_len));
- if (decompressed_size < 1) { return Status::IOError("Corrupt Lz4 compressed data."); }
- return Status::OK();
-}
-
-int64_t Lz4Codec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
- return LZ4_compressBound(static_cast<int>(input_len));
-}
-
-Status Lz4Codec::Compress(int64_t input_len, const uint8_t* input,
- int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_length) {
- *output_length = LZ4_compress_default(reinterpret_cast<const char*>(input),
- reinterpret_cast<char*>(output_buffer), static_cast<int>(input_len),
- static_cast<int>(output_buffer_len));
- if (*output_length < 1) { return Status::IOError("Lz4 compression failure."); }
- return Status::OK();
-}
-
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/dbedc8d2/cpp/src/arrow/util/compression_brotli.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc
new file mode 100644
index 0000000..f5f9f57
--- /dev/null
+++ b/cpp/src/arrow/util/compression_brotli.cc
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/compression.h"
+
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include <brotli/decode.h>
+#include <brotli/encode.h>
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// Brotli implementation
+
+Status BrotliCodec::Decompress(
+ int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
+ size_t output_size = output_len;
+ if (BrotliDecoderDecompress(input_len, input, &output_size, output_buffer) !=
+ BROTLI_DECODER_RESULT_SUCCESS) {
+ return Status::IOError("Corrupt brotli compressed data.");
+ }
+ return Status::OK();
+}
+
+int64_t BrotliCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
+ return BrotliEncoderMaxCompressedSize(input_len);
+}
+
+Status BrotliCodec::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_length) {
+ size_t output_len = output_buffer_len;
+ // TODO: Make quality configurable. We use 8 as a default as it is the best
+ // trade-off for Parquet workload
+ if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_len,
+ input, &output_len, output_buffer) == BROTLI_FALSE) {
+ return Status::IOError("Brotli compression failure.");
+ }
+ *output_length = output_len;
+ return Status::OK();
+}
+
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/dbedc8d2/cpp/src/arrow/util/compression_lz4.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc
new file mode 100644
index 0000000..27094f1
--- /dev/null
+++ b/cpp/src/arrow/util/compression_lz4.cc
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/compression.h"
+
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include <lz4.h>
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// Lz4 implementation
+
+Status Lz4Codec::Decompress(
+ int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
+ int64_t decompressed_size = LZ4_decompress_safe(reinterpret_cast<const char*>(input),
+ reinterpret_cast<char*>(output_buffer), static_cast<int>(input_len),
+ static_cast<int>(output_len));
+ if (decompressed_size < 1) { return Status::IOError("Corrupt Lz4 compressed data."); }
+ return Status::OK();
+}
+
+int64_t Lz4Codec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
+ return LZ4_compressBound(static_cast<int>(input_len));
+}
+
+Status Lz4Codec::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_length) {
+ *output_length = LZ4_compress_default(reinterpret_cast<const char*>(input),
+ reinterpret_cast<char*>(output_buffer), static_cast<int>(input_len),
+ static_cast<int>(output_buffer_len));
+ if (*output_length < 1) { return Status::IOError("Lz4 compression failure."); }
+ return Status::OK();
+}
+
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/dbedc8d2/cpp/src/arrow/util/compression_snappy.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc
new file mode 100644
index 0000000..ab418b0
--- /dev/null
+++ b/cpp/src/arrow/util/compression_snappy.cc
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/compression.h"
+
+// Work around warning caused by Snappy include
+#ifdef DISALLOW_COPY_AND_ASSIGN
+#undef DISALLOW_COPY_AND_ASSIGN
+#endif
+
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include <snappy.h>
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// Snappy implementation
+
+Status SnappyCodec::Decompress(
+ int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
+ if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
+ static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) {
+ return Status::IOError("Corrupt snappy compressed data.");
+ }
+ return Status::OK();
+}
+
+int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
+ return snappy::MaxCompressedLength(input_len);
+}
+
+Status SnappyCodec::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_length) {
+ size_t output_len;
+ snappy::RawCompress(reinterpret_cast<const char*>(input),
+ static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer),
+ &output_len);
+ *output_length = static_cast<int64_t>(output_len);
+ return Status::OK();
+}
+
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/dbedc8d2/cpp/src/arrow/util/compression_zlib.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc
new file mode 100644
index 0000000..934ea1b
--- /dev/null
+++ b/cpp/src/arrow/util/compression_zlib.cc
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/compression.h"
+
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include <zlib.h>
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// gzip implementation
+
+// These are magic numbers from zlib.h. Not clear why they are not defined
+// there.
+
+// Maximum window size
+static constexpr int WINDOW_BITS = 15;
+
+// Output Gzip.
+static constexpr int GZIP_CODEC = 16;
+
+// Determine if this is libz or gzip from header.
+static constexpr int DETECT_CODEC = 32;
+
+class GZipCodec::GZipCodecImpl {
+ public:
+ explicit GZipCodecImpl(GZipCodec::Format format)
+ : format_(format),
+ compressor_initialized_(false),
+ decompressor_initialized_(false) {}
+
+ ~GZipCodecImpl() {
+ EndCompressor();
+ EndDecompressor();
+ }
+
+ Status InitCompressor() {
+ EndDecompressor();
+ memset(&stream_, 0, sizeof(stream_));
+
+ int ret;
+ // Initialize to run specified format
+ int window_bits = WINDOW_BITS;
+ if (format_ == DEFLATE) {
+ window_bits = -window_bits;
+ } else if (format_ == GZIP) {
+ window_bits += GZIP_CODEC;
+ }
+ if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 9,
+ Z_DEFAULT_STRATEGY)) != Z_OK) {
+ std::stringstream ss;
+ ss << "zlib deflateInit failed: " << std::string(stream_.msg);
+ return Status::IOError(ss.str());
+ }
+ compressor_initialized_ = true;
+ return Status::OK();
+ }
+
+ void EndCompressor() {
+ if (compressor_initialized_) { (void)deflateEnd(&stream_); }
+ compressor_initialized_ = false;
+ }
+
+ Status InitDecompressor() {
+ EndCompressor();
+ memset(&stream_, 0, sizeof(stream_));
+ int ret;
+
+ // Initialize to run either deflate or zlib/gzip format
+ int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
+ if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
+ std::stringstream ss;
+ ss << "zlib inflateInit failed: " << std::string(stream_.msg);
+ return Status::IOError(ss.str());
+ }
+ decompressor_initialized_ = true;
+ return Status::OK();
+ }
+
+ void EndDecompressor() {
+ if (decompressor_initialized_) { (void)inflateEnd(&stream_); }
+ decompressor_initialized_ = false;
+ }
+
+ Status Decompress(int64_t input_length, const uint8_t* input, int64_t output_length,
+ uint8_t* output) {
+ if (!decompressor_initialized_) { RETURN_NOT_OK(InitDecompressor()); }
+ if (output_length == 0) {
+ // The zlib library does not allow *output to be NULL, even when output_length
+ // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an
+ // error, so bail early if no output is expected. Note that we don't signal
+ // an error if the input actually contains compressed data.
+ return Status::OK();
+ }
+
+ // Reset the stream for this block
+ if (inflateReset(&stream_) != Z_OK) {
+ std::stringstream ss;
+ ss << "zlib inflateReset failed: " << std::string(stream_.msg);
+ return Status::IOError(ss.str());
+ }
+
+ int ret = 0;
+ // gzip can run in streaming mode or non-streaming mode. We only
+ // support the non-streaming use case where we present it the entire
+ // compressed input and a buffer big enough to contain the entire
+ // compressed output. In the case where we don't know the output,
+ // we just make a bigger buffer and try the non-streaming mode
+ // from the beginning again.
+ while (ret != Z_STREAM_END) {
+ stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+ stream_.avail_in = static_cast<uInt>(input_length);
+ stream_.next_out = reinterpret_cast<Bytef*>(output);
+ stream_.avail_out = static_cast<uInt>(output_length);
+
+ // We know the output size. In this case, we can use Z_FINISH
+ // which is more efficient.
+ ret = inflate(&stream_, Z_FINISH);
+ if (ret == Z_STREAM_END || ret != Z_OK) break;
+
+ // Failure, buffer was too small
+ std::stringstream ss;
+ ss << "Too small a buffer passed to GZipCodec. InputLength=" << input_length
+ << " OutputLength=" << output_length;
+ return Status::IOError(ss.str());
+ }
+
+ // Failure for some other reason
+ if (ret != Z_STREAM_END) {
+ std::stringstream ss;
+ ss << "GZipCodec failed: ";
+ if (stream_.msg != NULL) ss << stream_.msg;
+ return Status::IOError(ss.str());
+ }
+ return Status::OK();
+ }
+
+ int64_t MaxCompressedLen(int64_t input_length, const uint8_t* input) {
+ // Most be in compression mode
+ if (!compressor_initialized_) {
+ Status s = InitCompressor();
+ DCHECK(s.ok());
+ }
+ // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase)
+ return deflateBound(&stream_, static_cast<uLong>(input_length));
+ }
+
+ Status Compress(int64_t input_length, const uint8_t* input, int64_t output_buffer_len,
+ uint8_t* output, int64_t* output_length) {
+ if (!compressor_initialized_) { RETURN_NOT_OK(InitCompressor()); }
+ stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+ stream_.avail_in = static_cast<uInt>(input_length);
+ stream_.next_out = reinterpret_cast<Bytef*>(output);
+ stream_.avail_out = static_cast<uInt>(output_buffer_len);
+
+ int64_t ret = 0;
+ if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
+ if (ret == Z_OK) {
+ // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
+ // small
+ return Status::IOError("zlib deflate failed, output buffer too small");
+ }
+ std::stringstream ss;
+ ss << "zlib deflate failed: " << stream_.msg;
+ return Status::IOError(ss.str());
+ }
+
+ if (deflateReset(&stream_) != Z_OK) {
+ std::stringstream ss;
+ ss << "zlib deflateReset failed: " << std::string(stream_.msg);
+ return Status::IOError(ss.str());
+ }
+
+ // Actual output length
+ *output_length = output_buffer_len - stream_.avail_out;
+ return Status::OK();
+ }
+
+ private:
+ // zlib is stateful and the z_stream state variable must be initialized
+ // before
+ z_stream stream_;
+
+ // Realistically, this will always be GZIP, but we leave the option open to
+ // configure
+ GZipCodec::Format format_;
+
+ // These variables are mutually exclusive. When the codec is in "compressor"
+ // state, compressor_initialized_ is true while decompressor_initialized_ is
+ // false. When it's decompressing, the opposite is true.
+ //
+ // Indeed, this is slightly hacky, but the alternative is having separate
+ // Compressor and Decompressor classes. If this ever becomes an issue, we can
+ // perform the refactoring then
+ bool compressor_initialized_;
+ bool decompressor_initialized_;
+};
+
+GZipCodec::GZipCodec(Format format) {
+ impl_.reset(new GZipCodecImpl(format));
+}
+
+GZipCodec::~GZipCodec() {}
+
+Status GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output) {
+ return impl_->Decompress(input_length, input, output_buffer_len, output);
+}
+
+int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) {
+ return impl_->MaxCompressedLen(input_length, input);
+}
+
+Status GZipCodec::Compress(int64_t input_length, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output, int64_t* output_length) {
+ return impl_->Compress(input_length, input, output_buffer_len, output, output_length);
+}
+
+const char* GZipCodec::name() const {
+ return "gzip";
+}
+
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/dbedc8d2/cpp/src/arrow/util/compression_zstd.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc
new file mode 100644
index 0000000..a03a3fc
--- /dev/null
+++ b/cpp/src/arrow/util/compression_zstd.cc
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/compression.h"
+
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include <zstd.h>
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// ZSTD implementation
+
+Status ZSTDCodec::Decompress(
+ int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
+ int64_t decompressed_size = ZSTD_decompress(output_buffer,
+ static_cast<size_t>(output_len), input, static_cast<size_t>(input_len));
+ if (decompressed_size != output_len) {
+ return Status::IOError("Corrupt ZSTD compressed data.");
+ }
+ return Status::OK();
+}
+
+int64_t ZSTDCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
+ return ZSTD_compressBound(input_len);
+}
+
+Status ZSTDCodec::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_length) {
+ *output_length = ZSTD_compress(output_buffer, static_cast<size_t>(output_buffer_len),
+ input, static_cast<size_t>(input_len), 1);
+ if (ZSTD_isError(*output_length)) {
+ return Status::IOError("ZSTD compression failure.");
+ }
+ return Status::OK();
+}
+
+} // namespace arrow