You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/02/13 00:38:52 UTC
parquet-cpp git commit: PARQUET-456: Finish gzip implementation and
unit test all compressors
Repository: parquet-cpp
Updated Branches:
refs/heads/master 5f3499c60 -> 05cd4ec28
PARQUET-456: Finish gzip implementation and unit test all compressors
We should perhaps separate compression and decompression code (as in Impala) as gzip is more stateful than the other compressors.
Closes #11 when merged.
Author: Wes McKinney <we...@cloudera.com>
Author: Konstantin Knizhnik <kn...@garret.ru>
Closes #48 from wesm/PARQUET-456 and squashes the following commits:
5aeba2a [Wes McKinney] Comment typo
8e1f8f2 [Wes McKinney] Move test run to shell script and enable OS X
633fd71 [Wes McKinney] Port gzip codec code from Impala, expand tests, get them to pass
a8d3c11 [Wes McKinney] Add compression round-trip test, gzip needs a bunch more work though
0bc8cf7 [Wes McKinney] Fix PATH_SUFFIXES for zlib
69548c9 [Konstantin Knizhnik] Add zlib to thirdparty build toolchain for compression codec
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/05cd4ec2
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/05cd4ec2
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/05cd4ec2
Branch: refs/heads/master
Commit: 05cd4ec2806316f6397857bd9e58263a0de38e32
Parents: 5f3499c
Author: Wes McKinney <we...@cloudera.com>
Authored: Fri Feb 12 15:37:38 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Fri Feb 12 15:37:38 2016 -0800
----------------------------------------------------------------------
.travis.yml | 5 +-
CMakeLists.txt | 6 +
ci/run_tests.sh | 5 +
cmake_modules/FindZLIB.cmake | 92 ++++++++++++++
setup_build_env.sh | 1 +
src/parquet/compression/CMakeLists.txt | 6 +-
src/parquet/compression/codec-test.cc | 87 ++++++++++++++
src/parquet/compression/codec.h | 76 +++++++++---
src/parquet/compression/gzip-codec.cc | 171 +++++++++++++++++++++++++++
src/parquet/compression/lz4-codec.cc | 12 +-
src/parquet/compression/snappy-codec.cc | 10 +-
src/parquet/util/test-common.h | 8 ++
thirdparty/build_thirdparty.sh | 8 ++
thirdparty/download_thirdparty.sh | 5 +
thirdparty/versions.sh | 4 +
15 files changed, 465 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a860bd7..09f4705 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -40,8 +40,5 @@ before_script:
script:
- make
-- >
- if [ $TRAVIS_OS_NAME == linux ]; then
- valgrind --tool=memcheck --leak-check=yes --error-exitcode=1 ctest;
- fi
+- source $TRAVIS_BUILD_DIR/ci/run_tests.sh
- make lint
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d262375..ec7d66b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -174,6 +174,12 @@ include_directories(SYSTEM ${LZ4_INCLUDE_DIR})
add_library(lz4static STATIC IMPORTED)
set_target_properties(lz4static PROPERTIES IMPORTED_LOCATION ${LZ4_STATIC_LIB})
+## ZLIB
+find_package(ZLIB REQUIRED)
+include_directories(SYSTEM ${ZLIB_INCLUDE_DIRS})
+add_library(zlibstatic STATIC IMPORTED)
+set_target_properties(zlibstatic PROPERTIES IMPORTED_LOCATION ${ZLIB_LIBRARIES})
+
## GTest
find_package(GTest REQUIRED)
include_directories(SYSTEM ${GTEST_INCLUDE_DIR})
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/ci/run_tests.sh b/ci/run_tests.sh
new file mode 100755
index 0000000..aa7b2f6
--- /dev/null
+++ b/ci/run_tests.sh
@@ -0,0 +1,5 @@
+if [ $TRAVIS_OS_NAME == "linux" ]; then
+ valgrind --tool=memcheck --leak-check=yes --error-exitcode=1 ctest;
+else
+ ctest;
+fi
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/cmake_modules/FindZLIB.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindZLIB.cmake b/cmake_modules/FindZLIB.cmake
new file mode 100644
index 0000000..0d7f2ae
--- /dev/null
+++ b/cmake_modules/FindZLIB.cmake
@@ -0,0 +1,92 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Tries to find ZLIB headers and libraries.
+#
+# Usage of this module as follows:
+#
+# find_package(ZLIB)
+#
+# Variables used by this module, they can change the default behaviour and need
+# to be set before calling find_package:
+#
+# ZLIB_HOME - When set, this path is inspected instead of standard library
+# locations as the root of the ZLIB installation.
+# The environment variable ZLIB_HOME overrides this veriable.
+#
+# - Find ZLIB (zlib.h, libz.a, libz.so, and libz.so.1)
+# This module defines
+# ZLIB_INCLUDE_DIR, directory containing headers
+# ZLIB_LIBS, directory containing zlib libraries
+# ZLIB_STATIC_LIB, path to libz.a
+# ZLIB_SHARED_LIB, path to libz's shared library
+# ZLIB_FOUND, whether zlib has been found
+
+if( NOT "$ENV{ZLIB_HOME}" STREQUAL "")
+ file( TO_CMAKE_PATH "$ENV{ZLIB_HOME}" _native_path )
+ list( APPEND _zlib_roots ${_native_path} )
+elseif ( ZLIB_HOME )
+ list( APPEND _zlib_roots ${ZLIB_HOME} )
+endif()
+
+# Try the parameterized roots, if they exist
+if ( _zlib_roots )
+ find_path( ZLIB_INCLUDE_DIR NAMES zlib.h
+ PATHS ${_zlib_roots} NO_DEFAULT_PATH
+ PATH_SUFFIXES "include" )
+ find_library( ZLIB_LIBRARIES NAMES z
+ PATHS ${_zlib_roots} NO_DEFAULT_PATH
+ PATH_SUFFIXES "lib" )
+else ()
+ find_path( ZLIB_INCLUDE_DIR NAMES zlib.h )
+ find_library( ZLIB_LIBRARIES NAMES z )
+endif ()
+
+
+if (ZLIB_INCLUDE_DIR AND ZLIB_LIBRARIES)
+ set(ZLIB_FOUND TRUE)
+ get_filename_component( ZLIB_LIBS ${ZLIB_LIBRARIES} DIRECTORY )
+ set(ZLIB_LIB_NAME libz)
+ set(ZLIB_STATIC_LIB ${ZLIB_LIBS}/${ZLIB_LIB_NAME}.a)
+ set(ZLIB_SHARED_LIB ${ZLIB_LIBS}/${ZLIB_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+else ()
+ set(ZLIB_FOUND FALSE)
+endif ()
+
+if (ZLIB_FOUND)
+ if (NOT ZLIB_FIND_QUIETLY)
+ message(STATUS "Found the ZLIB library: ${ZLIB_LIBRARIES}")
+ endif ()
+else ()
+ if (NOT ZLIB_FIND_QUIETLY)
+ set(ZLIB_ERR_MSG "Could not find the ZLIB library. Looked in ")
+ if ( _zlib_roots )
+ set(ZLIB_ERR_MSG "${ZLIB_ERR_MSG} in ${_zlib_roots}.")
+ else ()
+ set(ZLIB_ERR_MSG "${ZLIB_ERR_MSG} system search paths.")
+ endif ()
+ if (ZLIB_FIND_REQUIRED)
+ message(FATAL_ERROR "${ZLIB_ERR_MSG}")
+ else (ZLIB_FIND_REQUIRED)
+ message(STATUS "${ZLIB_ERR_MSG}")
+ endif (ZLIB_FIND_REQUIRED)
+ endif ()
+endif ()
+
+mark_as_advanced(
+ ZLIB_INCLUDE_DIR
+ ZLIB_LIBS
+ ZLIB_LIBRARIES
+ ZLIB_STATIC_LIB
+ ZLIB_SHARED_LIB
+)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/setup_build_env.sh
----------------------------------------------------------------------
diff --git a/setup_build_env.sh b/setup_build_env.sh
index c95b889..6df1f49 100755
--- a/setup_build_env.sh
+++ b/setup_build_env.sh
@@ -12,6 +12,7 @@ source thirdparty/versions.sh
export SNAPPY_HOME=$BUILD_DIR/thirdparty/installed
export LZ4_HOME=$BUILD_DIR/thirdparty/installed
+export ZLIB_HOME=$BUILD_DIR/thirdparty/installed
# build script doesn't support building thrift on OSX
if [ "$(uname)" != "Darwin" ]; then
export THRIFT_HOME=$BUILD_DIR/thirdparty/installed
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/compression/CMakeLists.txt b/src/parquet/compression/CMakeLists.txt
index 04f6535..2c0b67c 100644
--- a/src/parquet/compression/CMakeLists.txt
+++ b/src/parquet/compression/CMakeLists.txt
@@ -18,10 +18,12 @@
add_library(parquet_compression STATIC
lz4-codec.cc
snappy-codec.cc
+ gzip-codec.cc
)
target_link_libraries(parquet_compression
lz4static
- snappystatic)
+ snappystatic
+ zlibstatic)
set_target_properties(parquet_compression
PROPERTIES
@@ -31,3 +33,5 @@ set_target_properties(parquet_compression
install(FILES
codec.h
DESTINATION include/parquet/compression)
+
+ADD_PARQUET_TEST(codec-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/codec-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec-test.cc b/src/parquet/compression/codec-test.cc
new file mode 100644
index 0000000..610fb37
--- /dev/null
+++ b/src/parquet/compression/codec-test.cc
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include "parquet/util/test-common.h"
+
+#include "parquet/compression/codec.h"
+
+using std::string;
+using std::vector;
+
+namespace parquet_cpp {
+
+template <typename T>
+void CheckCodecRoundtrip(const vector<uint8_t>& data) {
+ // create multiple compressors to try to break them
+ T c1;
+ T c2;
+
+ int max_compressed_len = c1.MaxCompressedLen(data.size(), &data[0]);
+ std::vector<uint8_t> compressed(max_compressed_len);
+ std::vector<uint8_t> decompressed(data.size());
+
+ // compress with c1
+ int actual_size = c1.Compress(data.size(), &data[0], max_compressed_len,
+ &compressed[0]);
+ compressed.resize(actual_size);
+
+ // decompress with c2
+ c2.Decompress(compressed.size(), &compressed[0],
+ decompressed.size(), &decompressed[0]);
+
+ ASSERT_TRUE(test::vector_equal(data, decompressed));
+
+ // compress with c2
+ int actual_size2 = c2.Compress(data.size(), &data[0], max_compressed_len,
+ &compressed[0]);
+ ASSERT_EQ(actual_size2, actual_size);
+
+ // decompress with c1
+ c1.Decompress(compressed.size(), &compressed[0],
+ decompressed.size(), &decompressed[0]);
+
+ ASSERT_TRUE(test::vector_equal(data, decompressed));
+}
+
+template <typename T>
+void CheckCodec() {
+ int sizes[] = {10000, 100000};
+ for (int data_size : sizes) {
+ vector<uint8_t> data;
+ test::random_bytes(data_size, 1234, &data);
+ CheckCodecRoundtrip<T>(data);
+ }
+}
+
+TEST(TestCompressors, Snappy) {
+ CheckCodec<SnappyCodec>();
+}
+
+TEST(TestCompressors, Lz4) {
+ CheckCodec<Lz4Codec>();
+}
+
+TEST(TestCompressors, GZip) {
+ CheckCodec<GZipCodec>();
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
index 743a17d..8fc4ada 100644
--- a/src/parquet/compression/codec.h
+++ b/src/parquet/compression/codec.h
@@ -20,6 +20,8 @@
#include <cstdint>
+#include <zlib.h>
+
#include "parquet/exception.h"
namespace parquet_cpp {
@@ -27,13 +29,13 @@ namespace parquet_cpp {
class Codec {
public:
virtual ~Codec() {}
- virtual void Decompress(int input_len, const uint8_t* input,
- int output_len, uint8_t* output_buffer) = 0;
+ virtual void Decompress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output_buffer) = 0;
- virtual int Compress(int input_len, const uint8_t* input,
- int output_buffer_len, uint8_t* output_buffer) = 0;
+ virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer) = 0;
- virtual int MaxCompressedLen(int input_len, const uint8_t* input) = 0;
+ virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0;
virtual const char* name() const = 0;
};
@@ -42,13 +44,13 @@ class Codec {
// Snappy codec.
class SnappyCodec : public Codec {
public:
- virtual void Decompress(int input_len, const uint8_t* input,
- int output_len, uint8_t* output_buffer);
+ virtual void Decompress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output_buffer);
- virtual int Compress(int input_len, const uint8_t* input,
- int output_buffer_len, uint8_t* output_buffer);
+ virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer);
- virtual int MaxCompressedLen(int input_len, const uint8_t* input);
+ virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
virtual const char* name() const { return "snappy"; }
};
@@ -56,17 +58,61 @@ class SnappyCodec : public Codec {
// Lz4 codec.
class Lz4Codec : public Codec {
public:
- virtual void Decompress(int input_len, const uint8_t* input,
- int output_len, uint8_t* output_buffer);
+ virtual void Decompress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output_buffer);
- virtual int Compress(int input_len, const uint8_t* input,
- int output_buffer_len, uint8_t* output_buffer);
+ virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer);
- virtual int MaxCompressedLen(int input_len, const uint8_t* input);
+ virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
virtual const char* name() const { return "lz4"; }
};
+// GZip codec.
+class GZipCodec : public Codec {
+ public:
+ /// Compression formats supported by the zlib library
+ enum Format {
+ ZLIB,
+ DEFLATE,
+ GZIP,
+ };
+
+ explicit GZipCodec(Format format = GZIP);
+
+ virtual void Decompress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output_buffer);
+
+ virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer);
+
+ virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
+
+ virtual const char* name() const { return "gzip"; }
+
+ private:
+ // zlib is stateful and the z_stream state variable must be initialized
+ // before
+ z_stream stream_;
+
+ // Realistically, this will always be GZIP, but we leave the option open to
+ // configure
+ Format format_;
+
+ // These variables are mutually exclusive. When the codec is in "compressor"
+ // state, compressor_initialized_ is true while decompressor_initialized_ is
+ // false. When it's decompressing, the opposite is true.
+ //
+ // Indeed, this is slightly hacky, but the alternative is having separate
+ // Compressor and Decompressor classes. If this ever becomes an issue, we can
+ // perform the refactoring then
+ void InitCompressor();
+ void InitDecompressor();
+ bool compressor_initialized_;
+ bool decompressor_initialized_;
+};
+
} // namespace parquet_cpp
#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/gzip-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/gzip-codec.cc b/src/parquet/compression/gzip-codec.cc
new file mode 100644
index 0000000..6ec2726
--- /dev/null
+++ b/src/parquet/compression/gzip-codec.cc
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/compression/codec.h"
+
+#include <cstring>
+#include <sstream>
+
+namespace parquet_cpp {
+
+// These are magic numbers from zlib.h. Not clear why they are not defined
+// there.
+
+// Maximum window size
+static constexpr int WINDOW_BITS = 15;
+
+// Output Gzip.
+static constexpr int GZIP_CODEC = 16;
+
+// Determine if this is libz or gzip from header.
+static constexpr int DETECT_CODEC = 32;
+
+GZipCodec::GZipCodec(Format format) :
+ format_(format),
+ compressor_initialized_(false),
+ decompressor_initialized_(false) {
+}
+
+void GZipCodec::InitCompressor() {
+ memset(&stream_, 0, sizeof(stream_));
+
+ int ret;
+ // Initialize to run specified format
+ int window_bits = WINDOW_BITS;
+ if (format_ == DEFLATE) {
+ window_bits = -window_bits;
+ } else if (format_ == GZIP) {
+ window_bits += GZIP_CODEC;
+ }
+ if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
+ window_bits, 9, Z_DEFAULT_STRATEGY)) != Z_OK) {
+ throw ParquetException("zlib deflateInit failed: " +
+ std::string(stream_.msg));
+ }
+
+ compressor_initialized_ = true;
+ decompressor_initialized_ = false;
+}
+
+void GZipCodec::InitDecompressor() {
+ memset(&stream_, 0, sizeof(stream_));
+
+ int ret;
+
+ // Initialize to run either deflate or zlib/gzip format
+ int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
+ if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
+ throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg));
+ }
+
+ compressor_initialized_ = false;
+ decompressor_initialized_ = true;
+}
+
+void GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
+ int64_t output_length, uint8_t* output) {
+ if (!decompressor_initialized_) {
+ InitDecompressor();
+ }
+ if (output_length == 0) {
+ // The zlib library does not allow *output to be NULL, even when output_length
+ // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an
+ // error, so bail early if no output is expected. Note that we don't signal
+ // an error if the input actually contains compressed data.
+ return;
+ }
+
+ // Reset the stream for this block
+ if (inflateReset(&stream_) != Z_OK) {
+ throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg));
+ }
+
+ int ret = 0;
+ // gzip can run in streaming mode or non-streaming mode. We only
+ // support the non-streaming use case where we present it the entire
+ // compressed input and a buffer big enough to contain the entire
+ // compressed output. In the case where we don't know the output,
+ // we just make a bigger buffer and try the non-streaming mode
+ // from the beginning again.
+ while (ret != Z_STREAM_END) {
+ stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+ stream_.avail_in = input_length;
+ stream_.next_out = reinterpret_cast<Bytef*>(output);
+ stream_.avail_out = output_length;
+
+ // We know the output size. In this case, we can use Z_FINISH
+ // which is more efficient.
+ ret = inflate(&stream_, Z_FINISH);
+ if (ret == Z_STREAM_END || ret != Z_OK) break;
+
+ // Failure, buffer was too small
+ std::stringstream ss;
+ ss << "Too small a buffer passed to GZipCodec. InputLength="
+ << input_length << " OutputLength=" << output_length;
+ throw ParquetException(ss.str());
+ }
+
+ // Failure for some other reason
+ if (ret != Z_STREAM_END) {
+ std::stringstream ss;
+ ss << "GZipCodec failed: ";
+ if (stream_.msg != NULL) ss << stream_.msg;
+ throw ParquetException(ss.str());
+ }
+}
+
+int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) {
+ // Most be in compression mode
+ if (!compressor_initialized_) {
+ InitCompressor();
+ }
+ // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase)
+ return deflateBound(&stream_, static_cast<uLong>(input_length));
+}
+
+int64_t GZipCodec::Compress(int64_t input_length, const uint8_t* input,
+ int64_t output_length, uint8_t* output) {
+ if (!compressor_initialized_) {
+ InitCompressor();
+ }
+ stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+ stream_.avail_in = input_length;
+ stream_.next_out = reinterpret_cast<Bytef*>(output);
+ stream_.avail_out = output_length;
+
+ int64_t ret = 0;
+ if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
+ if (ret == Z_OK) {
+ // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
+ // small
+ throw ParquetException("zlib deflate failed, output buffer to small");
+ }
+ std::stringstream ss;
+ ss << "zlib deflate failed: " << stream_.msg;
+ throw ParquetException(ss.str());
+ }
+
+ if (deflateReset(&stream_) != Z_OK) {
+ throw ParquetException("zlib deflateReset failed: " +
+ std::string(stream_.msg));
+ }
+
+ // Actual output length
+ return output_length - stream_.avail_out;
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/lz4-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc
index 7b485f6..dfd50f6 100644
--- a/src/parquet/compression/lz4-codec.cc
+++ b/src/parquet/compression/lz4-codec.cc
@@ -21,21 +21,21 @@
namespace parquet_cpp {
-void Lz4Codec::Decompress(int input_len, const uint8_t* input,
- int output_len, uint8_t* output_buffer) {
- int n = LZ4_decompress_fast(reinterpret_cast<const char*>(input),
+void Lz4Codec::Decompress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output_buffer) {
+ int64_t n = LZ4_decompress_fast(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(output_buffer), output_len);
if (n != input_len) {
throw parquet_cpp::ParquetException("Corrupt lz4 compressed data.");
}
}
-int Lz4Codec::MaxCompressedLen(int input_len, const uint8_t* input) {
+int64_t Lz4Codec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
return LZ4_compressBound(input_len);
}
-int Lz4Codec::Compress(int input_len, const uint8_t* input,
- int output_buffer_len, uint8_t* output_buffer) {
+int64_t Lz4Codec::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer) {
return LZ4_compress(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(output_buffer), input_len);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/snappy-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc
index 0c7a63e..4135a15 100644
--- a/src/parquet/compression/snappy-codec.cc
+++ b/src/parquet/compression/snappy-codec.cc
@@ -21,20 +21,20 @@
namespace parquet_cpp {
-void SnappyCodec::Decompress(int input_len, const uint8_t* input,
- int output_len, uint8_t* output_buffer) {
+void SnappyCodec::Decompress(int64_t input_len, const uint8_t* input,
+ int64_t output_len, uint8_t* output_buffer) {
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) {
throw parquet_cpp::ParquetException("Corrupt snappy compressed data.");
}
}
-int SnappyCodec::MaxCompressedLen(int input_len, const uint8_t* input) {
+int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
return snappy::MaxCompressedLength(input_len);
}
-int SnappyCodec::Compress(int input_len, const uint8_t* input,
- int output_buffer_len, uint8_t* output_buffer) {
+int64_t SnappyCodec::Compress(int64_t input_len, const uint8_t* input,
+ int64_t output_buffer_len, uint8_t* output_buffer) {
size_t output_len;
snappy::RawCompress(reinterpret_cast<const char*>(input),
static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer),
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
index 84519d6..e75b163 100644
--- a/src/parquet/util/test-common.h
+++ b/src/parquet/util/test-common.h
@@ -95,6 +95,14 @@ static inline vector<bool> flip_coins(size_t n, double p) {
return draws;
}
+void random_bytes(int n, uint32_t seed, std::vector<uint8_t>* out) {
+ std::mt19937 gen(seed);
+ std::uniform_int_distribution<int> d(0, 255);
+
+ for (int i = 0; i < n; ++i) {
+ out->push_back(d(gen) & 0xFF);
+ }
+}
} // namespace test
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/thirdparty/build_thirdparty.sh
----------------------------------------------------------------------
diff --git a/thirdparty/build_thirdparty.sh b/thirdparty/build_thirdparty.sh
index 5e5cf6a..5e5fca4 100755
--- a/thirdparty/build_thirdparty.sh
+++ b/thirdparty/build_thirdparty.sh
@@ -16,6 +16,7 @@ else
for arg in "$*"; do
case $arg in
"lz4") F_LZ4=1 ;;
+ "zlib") F_ZLIB=1 ;;
"gtest") F_GTEST=1 ;;
"snappy") F_SNAPPY=1 ;;
"thrift") F_THRIFT=1 ;;
@@ -74,6 +75,13 @@ if [ -n "$F_ALL" -o -n "$F_LZ4" ]; then
make -j$PARALLEL install
fi
+# build zlib
+if [ -n "$F_ALL" -o -n "$F_ZLIB" ]; then
+ cd $TP_DIR/$ZLIB_BASEDIR
+ CFLAGS=-fPIC cmake -DCMAKE_INSTALL_PREFIX:PATH=$PREFIX $ZLIB_DIR
+ make -j$PARALLEL install
+fi
+
# build thrift
if [ -n "$F_ALL" -o -n "$F_THRIFT" ]; then
if [ "$(uname)" == "Darwin" ]; then
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/thirdparty/download_thirdparty.sh
----------------------------------------------------------------------
diff --git a/thirdparty/download_thirdparty.sh b/thirdparty/download_thirdparty.sh
index 9111cd4..e0dd7fd 100755
--- a/thirdparty/download_thirdparty.sh
+++ b/thirdparty/download_thirdparty.sh
@@ -33,3 +33,8 @@ if [ ! -d ${THRIFT_BASEDIR} ]; then
echo "Fetching thrift"
download_extract_and_cleanup $THRIFT_URL
fi
+
+if [ ! -d ${ZLIB_BASEDIR} ]; then
+ echo "Fetching zlib"
+ download_extract_and_cleanup $ZLIB_URL
+fi
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/thirdparty/versions.sh
----------------------------------------------------------------------
diff --git a/thirdparty/versions.sh b/thirdparty/versions.sh
index 9fa2d31..8c22265 100755
--- a/thirdparty/versions.sh
+++ b/thirdparty/versions.sh
@@ -13,3 +13,7 @@ THRIFT_BASEDIR=thrift-$THRIFT_VERSION
GTEST_VERSION=1.7.0
GTEST_URL="https://github.com/google/googletest/archive/release-${GTEST_VERSION}.tar.gz"
GTEST_BASEDIR=googletest-release-$GTEST_VERSION
+
+ZLIB_VERSION=1.2.8
+ZLIB_URL=http://zlib.net/zlib-${ZLIB_VERSION}.tar.gz
+ZLIB_BASEDIR=zlib-${ZLIB_VERSION}