You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "wjones127 (via GitHub)" <gi...@apache.org> on 2023/02/21 20:31:58 UTC

[GitHub] [arrow] wjones127 commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

wjones127 commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1113516374


##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1143,5 +1143,140 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
   }
 }
 
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries) {
+  auto sink = CreateOutputStream();
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED,
+                      {
+                          schema::Int32("required", Repetition::REQUIRED),
+                          schema::Int32("optional", Repetition::OPTIONAL),
+                          schema::Int32("repeated", Repetition::REPEATED),
+                      }));
+  // Write 11 levels at a time
+  constexpr int64_t batch_size = 11;
+  auto properties = WriterProperties::Builder()
+                        .disable_dictionary()
+                        ->data_page_version(ParquetDataPageVersion::V2)
+                        ->write_batch_size(batch_size)
+                        ->data_pagesize(1)
+                        ->build();
+  auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+  auto rg_writer = file_writer->AppendRowGroup();
+
+  constexpr int32_t num_levels = 100;
+  const std::vector<int32_t> values(num_levels, 1024);
+  std::array<int16_t, num_levels> def_levels;
+  std::array<int16_t, num_levels> rep_levels;
+  for (int32_t i = 0; i < num_levels; i++) {
+    def_levels[i] = i % 2 == 0 ? 1 : 0;
+    rep_levels[i] = i % 2 == 0 ? 0 : 1;
+  }
+
+  auto required_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  required_writer->WriteBatch(num_levels, nullptr, nullptr, values.data());
+
+  // Write a null value at every other row.
+  auto optional_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  optional_writer->WriteBatch(num_levels, def_levels.data(), nullptr, values.data());
+
+  // Each row has repeated twice.
+  auto repeated_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+                              values.data());
+  repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+                              values.data());
+
+  ASSERT_NO_THROW(file_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+  auto file_reader = ParquetFileReader::Open(
+      std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties());
+  auto metadata = file_reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+  auto row_group_reader = file_reader->RowGroup(0);
+
+  // Check if pages are changed on record boundaries.
+  constexpr int num_columns = 3;
+  const std::array<int64_t, num_columns> expected_num_pages = {10, 10, 19};
+  for (int i = 0; i < num_columns; ++i) {
+    auto page_reader = row_group_reader->GetColumnPageReader(i);
+    int64_t num_rows = 0;
+    int64_t num_pages = 0;
+    std::shared_ptr<Page> page;
+    while ((page = page_reader->NextPage()) != nullptr) {
+      auto data_page = std::static_pointer_cast<DataPageV2>(page);
+      if (i < 2) {
+        EXPECT_EQ(data_page->num_values(), data_page->num_rows());
+      } else {
+        // Make sure repeated column has 2 values per row and not span multiple pages.
+        EXPECT_EQ(data_page->num_values(), 2 * data_page->num_rows());
+      }
+      num_rows += data_page->num_rows();
+      num_pages++;
+    }
+    EXPECT_EQ(num_levels, num_rows);
+    EXPECT_EQ(expected_num_pages[i], num_pages);
+  }
+}
+
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundariesLargeBatchSize) {

Review Comment:
   For this test, what I would be more interested in seeing is writing a column where the number of repeats > `batch_size`. What do you think of that? Also perhaps a case where `number of repeats` = `batch_size` - 1.



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,

Review Comment:
   Why is this called `num_levels`? It seems to represent the number of values, and in turn the length of `def_levels` and `rep_levels`. Is that correct? If so, `num_values` seems more appropriate, as I think `num_levels` implies "depth" rather than "length".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org