You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "wgtmac (via GitHub)" <gi...@apache.org> on 2023/04/06 05:29:41 UTC

[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1159298271


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,361 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+                                 ::arrow::field("c1", ::arrow::utf8()),
+                                 ::arrow::field("c2", ::arrow::list(::arrow::int64()))});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a",  [1]      ],
+      [2,     "b",  [1, 2]   ],
+      [3,     "c",  [null]   ],
+      [null,  "d",  []       ],
+      [5,     null, [3, 3, 3]],
+      [6,     "f",  null     ]
+    ])");
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<std::string> c2_min_values = {encode_int64(1), encode_int64(3)};
+  const std::vector<std::string> c2_max_values = {encode_int64(2), encode_int64(3)};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+  const std::vector<int64_t> c2_null_counts = {2, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_FALSE(c0_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_FALSE(c1_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);
+
+    // Verify column index of c2.
+    auto c2_column_index = row_group_index_reader->GetColumnIndex(2);
+    ASSERT_NE(c2_column_index, nullptr);
+    ASSERT_EQ(num_pages, c2_column_index->null_pages().size());
+    ASSERT_FALSE(c2_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c2_column_index->boundary_order());
+    ASSERT_EQ(c2_min_values[rg], c2_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c2_max_values[rg], c2_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c2_column_index->has_null_counts());
+    ASSERT_EQ(c2_null_counts[rg], c2_column_index->null_counts()[0]);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithLargeStatsDropped) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(1) /* write single-row row group */
+                               ->max_statistics_size(20) /* drop stats larger than it */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      ["short_string"],
+      ["very_large_string_to_drop_stats"]
+    ])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index.
+    auto column_index = row_group_index_reader->GetColumnIndex(0);
+    if (rg == 0) {
+      ASSERT_NE(column_index, nullptr);
+      ASSERT_EQ(num_pages, column_index->null_pages().size());
+      ASSERT_FALSE(column_index->null_pages()[0]);
+      ASSERT_EQ(BoundaryOrder::Ascending, column_index->boundary_order());
+      ASSERT_EQ("short_string", column_index->encoded_min_values()[0]);
+      ASSERT_EQ("short_string", column_index->encoded_max_values()[0]);
+      ASSERT_TRUE(column_index->has_null_counts());
+      ASSERT_EQ(0, column_index->null_counts()[0]);
+    } else {
+      // Large stats have been dropped, so does the column index.
+      ASSERT_EQ(column_index, nullptr);
+    }
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithMultiplePages) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->data_pagesize(1) /* write multiple pages */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(
+      schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])",
+               R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+  auto row_group_index_reader = page_index_reader->RowGroup(0);
+  ASSERT_NE(row_group_index_reader, nullptr);
+
+  // Setup expected data.
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(3),
+                                                  encode_int64(6), ""};
+  const std::vector<std::string> c0_max_values = {encode_int64(2), encode_int64(4),
+                                                  encode_int64(6), ""};
+  const std::vector<std::string> c1_min_values = {"a", "c", "f", ""};
+  const std::vector<std::string> c1_max_values = {"b", "d", "f", ""};
+  const std::vector<int64_t> null_counts = {0, 0, 1, 2};
+  const std::vector<bool> null_pages = {false, false, false, true};
+  const size_t num_pages = 4;
+
+  // Verify page index by columns.
+  for (int col_idx = 0; col_idx < metadata->num_columns(); ++col_idx) {
+    // Verify column index.
+    auto column_index = row_group_index_reader->GetColumnIndex(col_idx);
+    ASSERT_NE(column_index, nullptr);
+    ASSERT_EQ(BoundaryOrder::Ascending, column_index->boundary_order());
+    ASSERT_EQ(num_pages, column_index->null_pages().size());
+    ASSERT_TRUE(column_index->has_null_counts());
+    for (size_t page_idx = 0; page_idx < num_pages; ++page_idx) {
+      ASSERT_EQ(null_pages[page_idx], column_index->null_pages()[page_idx]);
+      ASSERT_EQ(null_counts[page_idx], column_index->null_counts()[page_idx]);
+      if (col_idx == 0) {
+        ASSERT_EQ(c0_min_values[page_idx], column_index->encoded_min_values()[page_idx]);
+        ASSERT_EQ(c0_max_values[page_idx], column_index->encoded_max_values()[page_idx]);
+      } else {
+        ASSERT_EQ(c1_min_values[page_idx], column_index->encoded_min_values()[page_idx]);
+        ASSERT_EQ(c1_max_values[page_idx], column_index->encoded_max_values()[page_idx]);
+      }
+    }
+
+    // Verify offset index.
+    auto offset_index = row_group_index_reader->GetOffsetIndex(col_idx);
+    ASSERT_NE(offset_index, nullptr);
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    int64_t prev_offset = 0;
+    for (size_t page_idx = 0; page_idx < num_pages; ++page_idx) {
+      const auto& page_location = offset_index->page_locations()[page_idx];
+      ASSERT_EQ(static_cast<int64_t>(page_idx * 2), page_location.first_row_index);
+      ASSERT_GT(page_location.offset, prev_offset);

Review Comment:
   Unfortunately we don't. We may need to change the `PageReader` to return a list of offsets to data pages, otherwise we have no idea. The only thing we are sure is that the page offsets are in the ascending order so here is why I use `ASSERT_GT` to check offsets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org