You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/05 14:43:29 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #7789: PARQUET-1878: [C++] lz4 codec is not compatible with Hadoop Lz4Codec

pitrou commented on a change in pull request #7789:
URL: https://github.com/apache/arrow/pull/7789#discussion_r465764598



##########
File path: cpp/src/arrow/util/compression_lz4.cc
##########
@@ -349,6 +351,96 @@ class Lz4Codec : public Codec {
   const char* name() const override { return "lz4_raw"; }
 };
 
+// ----------------------------------------------------------------------
+// Lz4 Hadoop "raw" codec implementation
+
+class Lz4HadoopCodec : public Lz4Codec {
+ public:
+  Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
+                             int64_t output_buffer_len, uint8_t* output_buffer) override {
+    // The following variables only make sense if the parquet file being read was
+    // compressed using the Hadoop Lz4Codec.
+    //
+    // Parquet files written with the Hadoop Lz4Codec contain at the beginning
+    // of the input buffer two uint32_t's representing (in this order) expected
+    // decompressed size in bytes and expected compressed size in bytes.
+    //
+    // The Hadoop Lz4Codec source code can be found here:
+    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
+    uint32_t expected_decompressed_size =
+        BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
+    uint32_t expected_compressed_size =
+        BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
+    int64_t lz4_compressed_buffer_size = input_len - kPrefixLength;
+
+    // We use a heuristic to determine if the parquet file being read
+    // was compressed using the Hadoop Lz4Codec.
+    int64_t decompressed_size;
+    if (lz4_compressed_buffer_size != expected_compressed_size) {
+      // Parquet file was compressed without Hadoop Lz4Codec
+      ARROW_ASSIGN_OR_RAISE(
+          decompressed_size,
+          Lz4Codec::Decompress(input_len, input, output_buffer_len, output_buffer));
+    } else {
+      // Parquet file was likely compressed with Hadoop Lz4Codec
+      Result<int64_t> decompressed_size_result =
+          Lz4Codec::Decompress(lz4_compressed_buffer_size, input + kPrefixLength,
+                               output_buffer_len, output_buffer);
+
+      if (!decompressed_size_result.ok() ||
+          decompressed_size_result.ValueOrDie() != expected_decompressed_size) {
+        // Fall back on regular LZ4-block decompression
+        ARROW_ASSIGN_OR_RAISE(
+            decompressed_size,
+            Lz4Codec::Decompress(input_len, input, output_buffer_len, output_buffer));

Review comment:
       Just `return Lz4Codec::Decompress(...)`.

##########
File path: cpp/src/arrow/util/compression_lz4.cc
##########
@@ -349,6 +351,96 @@ class Lz4Codec : public Codec {
   const char* name() const override { return "lz4_raw"; }
 };
 
+// ----------------------------------------------------------------------
+// Lz4 Hadoop "raw" codec implementation
+
+class Lz4HadoopCodec : public Lz4Codec {
+ public:
+  Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
+                             int64_t output_buffer_len, uint8_t* output_buffer) override {
+    // The following variables only make sense if the parquet file being read was
+    // compressed using the Hadoop Lz4Codec.
+    //
+    // Parquet files written with the Hadoop Lz4Codec contain at the beginning
+    // of the input buffer two uint32_t's representing (in this order) expected
+    // decompressed size in bytes and expected compressed size in bytes.
+    //
+    // The Hadoop Lz4Codec source code can be found here:
+    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
+    uint32_t expected_decompressed_size =
+        BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
+    uint32_t expected_compressed_size =
+        BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));

Review comment:
       Before doing those two loads, you must ensure that `input_len` is at least `kPrefixLength`.

##########
File path: cpp/src/arrow/util/compression_lz4.cc
##########
@@ -349,6 +351,96 @@ class Lz4Codec : public Codec {
   const char* name() const override { return "lz4_raw"; }
 };
 
+// ----------------------------------------------------------------------
+// Lz4 Hadoop "raw" codec implementation
+
+class Lz4HadoopCodec : public Lz4Codec {
+ public:
+  Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
+                             int64_t output_buffer_len, uint8_t* output_buffer) override {
+    // The following variables only make sense if the parquet file being read was
+    // compressed using the Hadoop Lz4Codec.
+    //
+    // Parquet files written with the Hadoop Lz4Codec contain at the beginning
+    // of the input buffer two uint32_t's representing (in this order) expected
+    // decompressed size in bytes and expected compressed size in bytes.
+    //
+    // The Hadoop Lz4Codec source code can be found here:
+    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
+    uint32_t expected_decompressed_size =
+        BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
+    uint32_t expected_compressed_size =
+        BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
+    int64_t lz4_compressed_buffer_size = input_len - kPrefixLength;
+
+    // We use a heuristic to determine if the parquet file being read
+    // was compressed using the Hadoop Lz4Codec.
+    int64_t decompressed_size;
+    if (lz4_compressed_buffer_size != expected_compressed_size) {
+      // Parquet file was compressed without Hadoop Lz4Codec
+      ARROW_ASSIGN_OR_RAISE(
+          decompressed_size,
+          Lz4Codec::Decompress(input_len, input, output_buffer_len, output_buffer));
+    } else {
+      // Parquet file was likely compressed with Hadoop Lz4Codec
+      Result<int64_t> decompressed_size_result =
+          Lz4Codec::Decompress(lz4_compressed_buffer_size, input + kPrefixLength,
+                               output_buffer_len, output_buffer);
+
+      if (!decompressed_size_result.ok() ||
+          decompressed_size_result.ValueOrDie() != expected_decompressed_size) {
+        // Fall back on regular LZ4-block decompression
+        ARROW_ASSIGN_OR_RAISE(
+            decompressed_size,
+            Lz4Codec::Decompress(input_len, input, output_buffer_len, output_buffer));
+      } else {
+        decompressed_size = decompressed_size_result.ValueOrDie();

Review comment:
       `return decompressed_size_result.ValueOrDie()`

##########
File path: cpp/src/parquet/reader_test.cc
##########
@@ -58,6 +58,79 @@ std::string nation_dict_truncated_data_page() {
   return data_file("nation.dict-malformed.parquet");
 }
 
+std::string hadoop_lz4_compressed() { return data_file("hadoop_lz4_compressed.parquet"); }
+
+std::string non_hadoop_lz4_compressed() {
+  return data_file("non_hadoop_lz4_compressed.parquet");
+}
+
+// TODO: Assert on definition and repetition levels
+template <typename DType, typename ValueType>
+void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t batch_size,
+                        int64_t expected_levels_read,
+                        std::vector<ValueType>& expected_values,
+                        int64_t expected_values_read) {
+  std::vector<ValueType> values(batch_size);
+  int64_t values_read;
+
+  auto levels_read =
+      col->ReadBatch(batch_size, nullptr, nullptr, values.data(), &values_read);
+  ASSERT_EQ(expected_levels_read, levels_read);
+
+  ASSERT_EQ(expected_values, values);
+  ASSERT_EQ(expected_values_read, values_read);
+}
+
+struct CodecTestParam {
+  CodecTestParam(std::string data_file, uint32_t expected_metadata_size)
+      : data_file(data_file), expected_metadata_size(expected_metadata_size) {}
+
+  std::string data_file;
+  uint32_t expected_metadata_size;
+};
+
+class TestCodec : public ::testing::TestWithParam<CodecTestParam> {
+ protected:
+  const std::string& GetDataFile() { return GetParam().data_file; }
+
+  uint32_t GetExpectedMetadataSize() { return GetParam().expected_metadata_size; }
+};
+
+TEST_P(TestCodec, FileMetadataAndValues) {
+  std::unique_ptr<ParquetFileReader> reader_ = ParquetFileReader::OpenFile(GetDataFile());
+  std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
+
+  // This file only has 4 rows
+  ASSERT_EQ(4, reader_->metadata()->num_rows());
+  // This file only has 3 columns
+  ASSERT_EQ(3, reader_->metadata()->num_columns());
+  // This file only has 1 row group
+  ASSERT_EQ(1, reader_->metadata()->num_row_groups());
+  // Size of the metadata is given by GetExpectedMetadataSize()
+  ASSERT_EQ(GetExpectedMetadataSize(), reader_->metadata()->size());
+  // This row group must have 4 rows
+  ASSERT_EQ(4, group->metadata()->num_rows());
+
+  // column 0, c0
+  std::shared_ptr<Int64Reader> col0 =
+      std::dynamic_pointer_cast<Int64Reader>(group->Column(0));

Review comment:
       We would use `::arrow::util::checked_pointer_cast` here.

##########
File path: cpp/src/arrow/util/compression_lz4.cc
##########
@@ -349,6 +351,96 @@ class Lz4Codec : public Codec {
   const char* name() const override { return "lz4_raw"; }
 };
 
+// ----------------------------------------------------------------------
+// Lz4 Hadoop "raw" codec implementation
+
+class Lz4HadoopCodec : public Lz4Codec {
+ public:
+  Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
+                             int64_t output_buffer_len, uint8_t* output_buffer) override {
+    // The following variables only make sense if the parquet file being read was
+    // compressed using the Hadoop Lz4Codec.
+    //
+    // Parquet files written with the Hadoop Lz4Codec contain at the beginning
+    // of the input buffer two uint32_t's representing (in this order) expected
+    // decompressed size in bytes and expected compressed size in bytes.
+    //
+    // The Hadoop Lz4Codec source code can be found here:
+    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
+    uint32_t expected_decompressed_size =
+        BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
+    uint32_t expected_compressed_size =
+        BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
+    int64_t lz4_compressed_buffer_size = input_len - kPrefixLength;
+
+    // We use a heuristic to determine if the parquet file being read
+    // was compressed using the Hadoop Lz4Codec.
+    int64_t decompressed_size;
+    if (lz4_compressed_buffer_size != expected_compressed_size) {
+      // Parquet file was compressed without Hadoop Lz4Codec
+      ARROW_ASSIGN_OR_RAISE(

Review comment:
       For simplicity, can just `return Lz4Codec::Decompress(...)`

##########
File path: cpp/src/arrow/util/compression.cc
##########
@@ -131,7 +131,7 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
       if (compression_level_set) {
         return Status::Invalid("LZ4 doesn't support setting a compression level.");
       }
-      codec = internal::MakeLz4RawCodec();
+      codec = internal::MakeLz4HadoopRawCodec();

Review comment:
       @wesm is right. Also, we probably want to add specific tests in `arrow/util/compression_test.cc`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org