You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/12/13 17:28:28 UTC

(arrow) branch main updated: GH-39210: [C++][Parquet] Avoid WriteRecordBatch from produce zero-sized RowGroup (#39211)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4142607f61 GH-39210: [C++][Parquet] Avoid WriteRecordBatch from produce zero-sized RowGroup (#39211)
4142607f61 is described below

commit 4142607f61a2e52fddaaee6e82a9e1be1d462cd9
Author: mwish <ma...@gmail.com>
AuthorDate: Thu Dec 14 01:28:21 2023 +0800

    GH-39210: [C++][Parquet] Avoid WriteRecordBatch from produce zero-sized RowGroup (#39211)
    
    
    
    ### Rationale for this change
    
    `WriteRecordBatch` might produce zero-sized row-group, which is mentioned in https://github.com/apache/arrow/issues/39210 . This patch avoid WriteRecordBatch from produce zero-sized RowGroup.
    
    ### What changes are included in this PR?
    
    adding a check for zero-sized row-group
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    no
    
    * Closes: #39210
    
    Lead-authored-by: mwish <ma...@gmail.com>
    Co-authored-by: Antoine Pitrou <pi...@free.fr>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 41 +++++++++++++++++++++++
 cpp/src/parquet/arrow/writer.cc                   |  6 ++--
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index a2f3498190..dd0b19c2ce 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -5224,6 +5224,47 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {
   EXPECT_TRUE(record_batch->Equals(*read_record_batch));
 }
 
+TEST(TestArrowReadWrite, WriteRecordBatchNotProduceEmptyRowGroup) {
+  // GH-39211: WriteRecordBatch should prevent from writing a empty row group
+  // in the end of the file.
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  // Limit the max number of rows in a row group to 2
+  auto writer_properties = WriterProperties::Builder().max_row_group_length(2)->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+
+  // Prepare schema
+  auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int64())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  auto gen = ::arrow::random::RandomArrayGenerator(/*seed=*/42);
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  // NewBufferedRowGroup() is not called explicitly and it will be called
+  // inside WriteRecordBatch().
+  // Write 20 rows for two times
+  for (int i = 0; i < 2; ++i) {
+    auto record_batch =
+        gen.BatchOf({::arrow::field("a", ::arrow::int64())}, /*length=*/20);
+    ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  }
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  auto file_metadata = arrow_writer->metadata();
+  EXPECT_EQ(20, file_metadata->num_row_groups());
+  for (int i = 0; i < 20; ++i) {
+    EXPECT_EQ(2, file_metadata->RowGroup(i)->num_rows());
+  }
+}
+
 TEST(TestArrowReadWrite, MultithreadedWrite) {
   const int num_columns = 20;
   const int num_rows = 1000;
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 07c627d5ed..5238986c42 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -419,6 +419,7 @@ class FileWriterImpl : public FileWriter {
     // Max number of rows allowed in a row group.
     const int64_t max_row_group_length = this->properties().max_row_group_length();
 
+    // Initialize a new buffered row group writer if necessary.
     if (row_group_writer_ == nullptr || !row_group_writer_->buffered() ||
         row_group_writer_->num_rows() >= max_row_group_length) {
       RETURN_NOT_OK(NewBufferedRowGroup());
@@ -461,8 +462,9 @@ class FileWriterImpl : public FileWriter {
       RETURN_NOT_OK(WriteBatch(offset, batch_size));
       offset += batch_size;
 
-      // Flush current row group if it is full.
-      if (row_group_writer_->num_rows() >= max_row_group_length) {
+      // Flush current row group writer and create a new writer if it is full.
+      if (row_group_writer_->num_rows() >= max_row_group_length &&
+          offset < batch.num_rows()) {
         RETURN_NOT_OK(NewBufferedRowGroup());
       }
     }