You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "westonpace (via GitHub)" <gi...@apache.org> on 2023/02/22 13:52:43 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #33897: GH-33652: [C++][Parquet] Add interface total_compressed_bytes_written

westonpace commented on code in PR #33897:
URL: https://github.com/apache/arrow/pull/33897#discussion_r1114341142


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -526,7 +530,13 @@ class SerializedPageWriter : public PageWriter {
   int64_t num_values_;
   int64_t dictionary_page_offset_;
   int64_t data_page_offset_;
+  // The uncompressed page size the page writer already
+  //  written.

Review Comment:
   ```suggestion
     // The uncompressed page size the page writer has already
     //  written.
   ```



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1143,5 +1143,143 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
   }
 }
 
+class TestSizeEstimated : public ::testing::Test {
+ public:
+  void SetUp() {
+    sink_ = CreateOutputStream();
+    node_ = std::static_pointer_cast<GroupNode>(
+        GroupNode::Make("schema", Repetition::REQUIRED,
+                        {
+                            schema::Int32("required", Repetition::REQUIRED),
+                        }));
+    std::vector<schema::NodePtr> fields;
+    schema_descriptor_ = std::make_unique<SchemaDescriptor>();
+    schema_descriptor_->Init(node_);
+  }
+
+  std::shared_ptr<parquet::Int32Writer> BuildWriter(Compression::type compression,
+                                                    bool buffered,
+                                                    bool enable_dictionary = false) {
+    auto builder = WriterProperties::Builder();
+    builder.disable_dictionary()
+        ->disable_dictionary()

Review Comment:
   ```suggestion
   ```
   You don't need this since you enable/disable below.



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -526,7 +530,13 @@ class SerializedPageWriter : public PageWriter {
   int64_t num_values_;
   int64_t dictionary_page_offset_;
   int64_t data_page_offset_;
+  // The uncompressed page size the page writer already
+  //  written.
   int64_t total_uncompressed_size_;
+  // The compressed page size the page writer already
+  //  written.

Review Comment:
   ```suggestion
     // The compressed page size the page writer has already
     //  written.
   ```



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1143,5 +1143,143 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
   }
 }
 
+class TestSizeEstimated : public ::testing::Test {
+ public:
+  void SetUp() {
+    sink_ = CreateOutputStream();
+    node_ = std::static_pointer_cast<GroupNode>(
+        GroupNode::Make("schema", Repetition::REQUIRED,
+                        {
+                            schema::Int32("required", Repetition::REQUIRED),
+                        }));
+    std::vector<schema::NodePtr> fields;
+    schema_descriptor_ = std::make_unique<SchemaDescriptor>();
+    schema_descriptor_->Init(node_);
+  }
+
+  std::shared_ptr<parquet::Int32Writer> BuildWriter(Compression::type compression,
+                                                    bool buffered,
+                                                    bool enable_dictionary = false) {
+    auto builder = WriterProperties::Builder();
+    builder.disable_dictionary()
+        ->disable_dictionary()
+        ->compression(compression)
+        ->data_pagesize(100 * sizeof(int));
+    if (enable_dictionary) {
+      builder.enable_dictionary();
+    } else {
+      builder.disable_dictionary();
+    }
+    writer_properties_ = builder.build();
+    metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_,
+                                                 schema_descriptor_->Column(0));
+
+    std::unique_ptr<PageWriter> pager = PageWriter::Open(
+        sink_, compression, Codec::UseDefaultCompressionLevel(), metadata_.get(),
+        /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1,
+        ::arrow::default_memory_pool(), /* buffered_row_group */ buffered,
+        /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR,
+        /* enable_checksum */ false);
+    return std::static_pointer_cast<parquet::Int32Writer>(
+        ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()));
+  }
+
+  std::shared_ptr<::arrow::io::BufferOutputStream> sink_;
+  std::shared_ptr<GroupNode> node_;
+  std::unique_ptr<SchemaDescriptor> schema_descriptor_;
+
+  std::shared_ptr<WriterProperties> writer_properties_;
+  std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
+};
+
+TEST_F(TestSizeEstimated, NonBuffered) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, false);
+
+  // Write half page
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());  // unbuffered
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_LT(400, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(TestSizeEstimated, Buffered) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, true);
+
+  // Write half page
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());  // buffered
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_LT(400, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(TestSizeEstimated, NonBufferedDictionary) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, false, true);
+
+  // Write half page
+  // for dict, keep value equal
+  int32_t dict_value = 1;
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &dict_value);
+  }
+  // Page flushed, check size

Review Comment:
   ```suggestion
   ```
   I don't think the page is flushed yet based on the comment below.



##########
cpp/src/parquet/column_writer.h:
##########
@@ -131,14 +134,23 @@ class PARQUET_EXPORT ColumnWriter {
   /// \brief The number of rows written so far
   virtual int64_t rows_written() const = 0;
 
-  /// \brief The total size of the compressed pages + page headers. Some values
-  /// might be still buffered and not written to a page yet
+  /// \brief The total size of the compressed pages + page headers. Values
+  /// are still buffered and not written to a pager yet

Review Comment:
   Is this the number of bytes that are buffered but not yet written?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1143,5 +1143,143 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
   }
 }
 
+class TestSizeEstimated : public ::testing::Test {
+ public:
+  void SetUp() {
+    sink_ = CreateOutputStream();
+    node_ = std::static_pointer_cast<GroupNode>(
+        GroupNode::Make("schema", Repetition::REQUIRED,
+                        {
+                            schema::Int32("required", Repetition::REQUIRED),
+                        }));
+    std::vector<schema::NodePtr> fields;
+    schema_descriptor_ = std::make_unique<SchemaDescriptor>();
+    schema_descriptor_->Init(node_);
+  }
+
+  std::shared_ptr<parquet::Int32Writer> BuildWriter(Compression::type compression,
+                                                    bool buffered,
+                                                    bool enable_dictionary = false) {
+    auto builder = WriterProperties::Builder();
+    builder.disable_dictionary()
+        ->disable_dictionary()
+        ->compression(compression)
+        ->data_pagesize(100 * sizeof(int));
+    if (enable_dictionary) {
+      builder.enable_dictionary();
+    } else {
+      builder.disable_dictionary();
+    }
+    writer_properties_ = builder.build();
+    metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_,
+                                                 schema_descriptor_->Column(0));
+
+    std::unique_ptr<PageWriter> pager = PageWriter::Open(
+        sink_, compression, Codec::UseDefaultCompressionLevel(), metadata_.get(),
+        /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1,
+        ::arrow::default_memory_pool(), /* buffered_row_group */ buffered,
+        /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR,
+        /* enable_checksum */ false);
+    return std::static_pointer_cast<parquet::Int32Writer>(
+        ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()));
+  }
+
+  std::shared_ptr<::arrow::io::BufferOutputStream> sink_;
+  std::shared_ptr<GroupNode> node_;
+  std::unique_ptr<SchemaDescriptor> schema_descriptor_;
+
+  std::shared_ptr<WriterProperties> writer_properties_;
+  std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
+};
+
+TEST_F(TestSizeEstimated, NonBuffered) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, false);
+
+  // Write half page
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());  // unbuffered
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_LT(400, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(TestSizeEstimated, Buffered) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, true);
+
+  // Write half page
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());  // buffered
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_LT(400, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(TestSizeEstimated, NonBufferedDictionary) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, false, true);
+
+  // Write half page
+  // for dict, keep value equal
+  int32_t dict_value = 1;
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &dict_value);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  // write a huge batch to trigger page flush
+  for (int32_t i = 0; i < 50000; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &dict_value);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_LT(400, required_writer->total_compressed_bytes());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+
+  required_writer->Close();
+}
+
+TEST_F(TestSizeEstimated, BufferedCompression) {
+#ifndef ARROW_WITH_SNAPPY
+  GTEST_SKIP() << "Test requires snappy compression";
+#endif
+  auto required_writer = this->BuildWriter(Compression::SNAPPY, true);
+
+  // Write half page
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size

Review Comment:
   ```suggestion
   ```
   If it's flushed then why is bytes written 0 here?



##########
cpp/src/parquet/column_writer.h:
##########
@@ -136,8 +138,15 @@ class PARQUET_EXPORT ColumnWriter {
 
   /// \brief The total number of bytes written as serialized data and
   /// dictionary pages to the ColumnChunk so far
+  /// These bytes are uncompressed bytes.
   virtual int64_t total_bytes_written() const = 0;
 
+  /// \brief The total number of bytes written as serialized data and
+  /// dictionary pages to the ColumnChunk so far.
+  /// If the column is uncompressed, the value would be equal to

Review Comment:
   Do both of them record bytes from dictionary pages written?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1143,5 +1143,143 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
   }
 }
 
+class TestSizeEstimated : public ::testing::Test {
+ public:
+  void SetUp() {
+    sink_ = CreateOutputStream();
+    node_ = std::static_pointer_cast<GroupNode>(
+        GroupNode::Make("schema", Repetition::REQUIRED,
+                        {
+                            schema::Int32("required", Repetition::REQUIRED),
+                        }));
+    std::vector<schema::NodePtr> fields;
+    schema_descriptor_ = std::make_unique<SchemaDescriptor>();
+    schema_descriptor_->Init(node_);
+  }
+
+  std::shared_ptr<parquet::Int32Writer> BuildWriter(Compression::type compression,
+                                                    bool buffered,
+                                                    bool enable_dictionary = false) {
+    auto builder = WriterProperties::Builder();
+    builder.disable_dictionary()
+        ->disable_dictionary()
+        ->compression(compression)
+        ->data_pagesize(100 * sizeof(int));
+    if (enable_dictionary) {
+      builder.enable_dictionary();
+    } else {
+      builder.disable_dictionary();
+    }
+    writer_properties_ = builder.build();
+    metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_,
+                                                 schema_descriptor_->Column(0));
+
+    std::unique_ptr<PageWriter> pager = PageWriter::Open(
+        sink_, compression, Codec::UseDefaultCompressionLevel(), metadata_.get(),
+        /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1,
+        ::arrow::default_memory_pool(), /* buffered_row_group */ buffered,
+        /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR,
+        /* enable_checksum */ false);
+    return std::static_pointer_cast<parquet::Int32Writer>(
+        ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()));
+  }
+
+  std::shared_ptr<::arrow::io::BufferOutputStream> sink_;
+  std::shared_ptr<GroupNode> node_;
+  std::unique_ptr<SchemaDescriptor> schema_descriptor_;
+
+  std::shared_ptr<WriterProperties> writer_properties_;
+  std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
+};
+
+TEST_F(TestSizeEstimated, NonBuffered) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, false);
+
+  // Write half page
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());  // unbuffered
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_LT(400, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(TestSizeEstimated, Buffered) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, true);
+
+  // Write half page
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());  // buffered
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_LT(400, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(TestSizeEstimated, NonBufferedDictionary) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, false, true);
+
+  // Write half page
+  // for dict, keep value equal
+  int32_t dict_value = 1;
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &dict_value);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  // write a huge batch to trigger page flush
+  for (int32_t i = 0; i < 50000; i++) {

Review Comment:
   Just a question.  Why do we need such a large batch to flush a page when writing a dictionary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org