You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/11/23 09:24:00 UTC

[jira] [Commented] (PARQUET-970) Add Add Lz4 and Zstd compression codecs

    [ https://issues.apache.org/jira/browse/PARQUET-970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264047#comment-16264047 ] 

ASF GitHub Bot commented on PARQUET-970:
----------------------------------------

xhochy closed pull request #419: PARQUET-970: Add Lz4 and Zstd compression codecs
URL: https://github.com/apache/parquet-cpp/pull/419
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/CMakeLists.txt b/CMakeLists.txt
index c524ceb5..0183852c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -562,7 +562,9 @@ else()
       NOT DEFINED ENV{BROTLI_STATIC_LIB_DEC} OR
       NOT DEFINED ENV{BROTLI_STATIC_LIB_COMMON} OR
       NOT DEFINED ENV{SNAPPY_STATIC_LIB} OR
-      NOT DEFINED ENV{ZLIB_STATIC_LIB})
+      NOT DEFINED ENV{ZLIB_STATIC_LIB} OR
+      NOT DEFINED ENV{LZ4_STATIC_LIB} OR
+      NOT DEFINED ENV{ZSTD_STATIC_LIB})
     message(FATAL_ERROR "Missing transitive dependencies for Arrow static linking")
   endif()
 
@@ -571,6 +573,8 @@ else()
   set(BROTLI_STATIC_LIB_COMMON "$ENV{BROTLI_STATIC_LIB_COMMON}")
   set(SNAPPY_STATIC_LIB "$ENV{SNAPPY_STATIC_LIB}")
   set(ZLIB_STATIC_LIB "$ENV{ZLIB_STATIC_LIB}")
+  set(LZ4_STATIC_LIB "$ENV{LZ4_STATIC_LIB}")
+  set(ZSTD_STATIC_LIB "$ENV{ZSTD_STATIC_LIB}")
 
   add_library(brotli_enc STATIC IMPORTED)
   set_target_properties(brotli_enc PROPERTIES IMPORTED_LOCATION ${BROTLI_STATIC_LIB_ENC})
@@ -582,6 +586,10 @@ else()
   set_target_properties(snappy PROPERTIES IMPORTED_LOCATION ${SNAPPY_STATIC_LIB})
   add_library(zlib STATIC IMPORTED)
   set_target_properties(zlib PROPERTIES IMPORTED_LOCATION ${ZLIB_STATIC_LIB})
+  add_library(lz4 STATIC IMPORTED)
+  set_target_properties(lz4 PROPERTIES IMPORTED_LOCATION ${LZ4_STATIC_LIB})
+  add_library(zstd STATIC IMPORTED)
+  set_target_properties(zstd PROPERTIES IMPORTED_LOCATION ${ZSTD_STATIC_LIB})
 
   set(TRANSITIVE_LINK_LIBS
     snappy
@@ -589,6 +597,8 @@ else()
     brotli_enc
     brotli_dec
     brotli_common
+    lz4
+    zstd
   )
 
   set(ARROW_LINK_LIBS
diff --git a/ci/msvc-build.bat b/ci/msvc-build.bat
index 67df5651..29d8b839 100644
--- a/ci/msvc-build.bat
+++ b/ci/msvc-build.bat
@@ -28,7 +28,9 @@ if NOT "%CONFIGURATION%" == "Debug" (
 )
 
 if "%CONFIGURATION%" == "Toolchain" (
-  conda install -y boost-cpp=1.63 brotli=0.6.0 zlib=1.2.11 snappy=1.1.6 thrift-cpp=0.10.0 -c conda-forge
+  conda install -y boost-cpp=1.63 thrift-cpp=0.10.0 ^
+      brotli=0.6.0 zlib=1.2.11 snappy=1.1.6 lz4-c=1.7.5 zstd=1.2.0 ^
+      -c conda-forge
 
   set ARROW_BUILD_TOOLCHAIN=%MINICONDA%/Library
   set PARQUET_BUILD_TOOLCHAIN=%MINICONDA%/Library
diff --git a/ci/travis_script_static.sh b/ci/travis_script_static.sh
index 29331e97..6da7a334 100755
--- a/ci/travis_script_static.sh
+++ b/ci/travis_script_static.sh
@@ -62,6 +62,8 @@ export BROTLI_STATIC_LIB_ENC=$BROTLI_EP/libbrotlienc.a
 export BROTLI_STATIC_LIB_DEC=$BROTLI_EP/libbrotlidec.a
 export BROTLI_STATIC_LIB_COMMON=$BROTLI_EP/libbrotlicommon.a
 export ZLIB_STATIC_LIB=$ARROW_EP/zlib_ep/src/zlib_ep-install/lib/libz.a
+export LZ4_STATIC_LIB=$ARROW_EP/lz4_ep-prefix/src/lz4_ep/lib/liblz4.a
+export ZSTD_STATIC_LIB=$ARROW_EP/zstd_ep-prefix/src/zstd_ep/lib/libzstd.a
 
 cmake -DPARQUET_CXXFLAGS="$PARQUET_CXXFLAGS" \
       -DPARQUET_TEST_MEMCHECK=ON \
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index fe1d4999..386a3e19 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -359,8 +359,8 @@ if (NOT ARROW_FOUND)
     -DCMAKE_INSTALL_LIBDIR=${ARROW_LIB_DIR}
     -DARROW_JEMALLOC=OFF
     -DARROW_IPC=OFF
-    -DARROW_WITH_LZ4=OFF
-    -DARROW_WITH_ZSTD=OFF
+    -DARROW_WITH_LZ4=ON
+    -DARROW_WITH_ZSTD=ON
     -DARROW_BUILD_SHARED=${PARQUET_BUILD_SHARED}
     -DARROW_BOOST_USE_SHARED=${PARQUET_BOOST_USE_SHARED}
     -DARROW_BUILD_TESTS=OFF)
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
index c20d6e2a..ec7b52ef 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -56,14 +56,16 @@ void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) {
   state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
 }
 
-template <Repetition::type repetition>
+template <Repetition::type repetition,
+          Compression::type codec = Compression::UNCOMPRESSED>
 static void BM_WriteInt64Column(::benchmark::State& state) {
   format::ColumnChunk thrift_metadata;
   std::vector<int64_t> values(state.range(0), 128);
   std::vector<int16_t> definition_levels(state.range(0), 1);
   std::vector<int16_t> repetition_levels(state.range(0), 0);
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
-  std::shared_ptr<WriterProperties> properties = default_writer_properties();
+  WriterProperties::Builder builder;
+  std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build();
   auto metadata = ColumnChunkMetaDataBuilder::Make(
       properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
 
@@ -84,6 +86,27 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL)->Range(1024, 65536
 
 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Range(1024, 65536);
 
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::SNAPPY)
+    ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::SNAPPY)
+    ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::SNAPPY)
+    ->Range(1024, 65536);
+
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::LZ4)
+    ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::LZ4)
+    ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::LZ4)
+    ->Range(1024, 65536);
+
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::ZSTD)
+    ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
+    ->Range(1024, 65536);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::ZSTD)
+    ->Range(1024, 65536);
+
 std::unique_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
                                          int64_t num_values, ColumnDescriptor* schema) {
   std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
@@ -92,14 +115,16 @@ std::unique_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
   return std::unique_ptr<Int64Reader>(new Int64Reader(schema, std::move(page_reader)));
 }
 
-template <Repetition::type repetition>
+template <Repetition::type repetition,
+          Compression::type codec = Compression::UNCOMPRESSED>
 static void BM_ReadInt64Column(::benchmark::State& state) {
   format::ColumnChunk thrift_metadata;
   std::vector<int64_t> values(state.range(0), 128);
   std::vector<int16_t> definition_levels(state.range(0), 1);
   std::vector<int16_t> repetition_levels(state.range(0), 0);
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
-  std::shared_ptr<WriterProperties> properties = default_writer_properties();
+  WriterProperties::Builder builder;
+  std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build();
   auto metadata = ColumnChunkMetaDataBuilder::Make(
       properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
 
@@ -134,6 +159,27 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL)
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED)
     ->RangePair(1024, 65536, 1, 1024);
 
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::SNAPPY)
+    ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::SNAPPY)
+    ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::SNAPPY)
+    ->RangePair(1024, 65536, 1, 1024);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::LZ4)
+    ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::LZ4)
+    ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::LZ4)
+    ->RangePair(1024, 65536, 1, 1024);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::ZSTD)
+    ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
+    ->RangePair(1024, 65536, 1, 1024);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::ZSTD)
+    ->RangePair(1024, 65536, 1, 1024);
+
 static void BM_RleEncoding(::benchmark::State& state) {
   std::vector<int16_t> levels(state.range(0), 0);
   int64_t n = 0;
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 3e4c04f9..681f022b 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -307,6 +307,16 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
                                  LARGE_SIZE);
 }
 
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) {
+  this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false,
+                                 LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompression) {
+  this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false,
+                                 LARGE_SIZE);
+}
+
 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true,
                                  LARGE_SIZE);
@@ -327,6 +337,16 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
                                  LARGE_SIZE);
 }
 
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) {
+  this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true,
+                                 LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) {
+  this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, true,
+                                 LARGE_SIZE);
+}
+
 TYPED_TEST(TestPrimitiveWriter, Optional) {
   // Optional and non-repeated, with definition levels
   // but no repetition levels
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index 39ea1d9b..0cab75f1 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -178,8 +178,9 @@ TEST_F(TestPageSerde, TestFailLargePageHeaders) {
 }
 
 TEST_F(TestPageSerde, Compression) {
-  Compression::type codec_types[3] = {Compression::GZIP, Compression::SNAPPY,
-                                      Compression::BROTLI};
+  Compression::type codec_types[5] = {Compression::GZIP, Compression::SNAPPY,
+                                      Compression::BROTLI, Compression::LZ4,
+                                      Compression::ZSTD};
 
   const int32_t num_rows = 32;  // dummy value
   data_page_header_.num_values = num_rows;
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 75f3fbdf..f9f12bea 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -209,7 +209,17 @@ TYPED_TEST(TestSerialize, SmallFileBrotli) {
   this->FileSerializeTest(Compression::BROTLI);
 }
 
-TYPED_TEST(TestSerialize, SmallFileGzip) { this->FileSerializeTest(Compression::GZIP); }
+TYPED_TEST(TestSerialize, SmallFileGzip) {
+  this->FileSerializeTest(Compression::GZIP);
+}
+
+TYPED_TEST(TestSerialize, SmallFileLz4) {
+  this->FileSerializeTest(Compression::LZ4);
+}
+
+TYPED_TEST(TestSerialize, SmallFileZstd) {
+  this->FileSerializeTest(Compression::ZSTD);
+}
 
 }  // namespace test
 
diff --git a/src/parquet/parquet.thrift b/src/parquet/parquet.thrift
index f774d492..a72ef2ca 100644
--- a/src/parquet/parquet.thrift
+++ b/src/parquet/parquet.thrift
@@ -363,6 +363,8 @@ enum CompressionCodec {
   GZIP = 2;
   LZO = 3;
   BROTLI = 4;
+  LZ4 = 5;
+  ZSTD = 6;
 }
 
 enum PageType {
diff --git a/src/parquet/properties-test.cc b/src/parquet/properties-test.cc
index c48fc34a..4a063c1e 100644
--- a/src/parquet/properties-test.cc
+++ b/src/parquet/properties-test.cc
@@ -46,12 +46,14 @@ TEST(TestWriterProperties, Basics) {
 TEST(TestWriterProperties, AdvancedHandling) {
   WriterProperties::Builder builder;
   builder.compression("gzip", Compression::GZIP);
+  builder.compression("zstd", Compression::ZSTD);
   builder.compression(Compression::SNAPPY);
   builder.encoding(Encoding::DELTA_BINARY_PACKED);
   builder.encoding("delta-length", Encoding::DELTA_LENGTH_BYTE_ARRAY);
   std::shared_ptr<WriterProperties> props = builder.build();
 
   ASSERT_EQ(Compression::GZIP, props->compression(ColumnPath::FromDotString("gzip")));
+  ASSERT_EQ(Compression::ZSTD, props->compression(ColumnPath::FromDotString("zstd")));
   ASSERT_EQ(Compression::SNAPPY,
             props->compression(ColumnPath::FromDotString("delta-length")));
   ASSERT_EQ(Encoding::DELTA_BINARY_PACKED,
diff --git a/src/parquet/types.cc b/src/parquet/types.cc
index 0652c6a8..8ec3f3b1 100644
--- a/src/parquet/types.cc
+++ b/src/parquet/types.cc
@@ -108,6 +108,12 @@ std::string CompressionToString(Compression::type t) {
     case Compression::LZO:
       return "LZO";
       break;
+    case Compression::LZ4:
+      return "LZ4";
+      break;
+    case Compression::ZSTD:
+      return "ZSTD";
+      break;
     default:
       return "UNKNOWN";
       break;
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 53b33d56..a8109449 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -109,7 +109,7 @@ struct Encoding {
 
 // Compression, mirrors parquet::CompressionCodec
 struct Compression {
-  enum type { UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI };
+  enum type { UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD };
 };
 
 // parquet::PageType
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 94b86c1a..a28917bd 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -57,6 +57,12 @@ static inline std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::typ
     case Compression::BROTLI:
       PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::BROTLI, &result));
       break;
+    case Compression::LZ4:
+      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZ4, &result));
+      break;
+    case Compression::ZSTD:
+      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::ZSTD, &result));
+      break;
     default:
       break;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add Add Lz4 and Zstd compression codecs
> ---------------------------------------
>
>                 Key: PARQUET-970
>                 URL: https://issues.apache.org/jira/browse/PARQUET-970
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-cpp
>            Reporter: Xianjin YE
>            Assignee: Xianjin YE
>             Fix For: cpp-1.4.0
>
>
> https://github.com/facebook/zstd looks quite promising, I'd like to add a compressor in parquet-cpp.
> Lz4 and Zstd codecs are added as parquet-format has already added these codecs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)