You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "wgtmac (via GitHub)" <gi...@apache.org> on 2023/02/06 15:22:43 UTC

[GitHub] [arrow] wgtmac opened a new pull request, #34054: GH-34053: [C++][Parquet] Write parquet page index

wgtmac opened a new pull request, #34054:
URL: https://github.com/apache/arrow/pull/34054

   ### Rationale for this change
   
   Parquet C++ reader supports reading page index from file, but the writer does not yet support writing it.
   
   ### What changes are included in this PR?
   
   Parquet file writer collect page index from all data pages and serialize page index into the file.
   
   ### Are these changes tested?
   
   Not yet, will be added later.
   
   ### Are there any user-facing changes?
   
   `WriterProperties::enable_write_page_index()` and `WriterProperties::disable_write_page_index()` have been added to toggle it on and off.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1419259419

   * Closes: #34053


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] etseidl commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "etseidl (via GitHub)" <gi...@apache.org>.
etseidl commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1426288326

   @wgtmac another issue to consider as you implement the page index is rows that span multiple pages. With nested columns, it is possible to have single rows that are so large that they exceed the requested page size.  arrow-cpp currently will honor the page size by splitting these rows across multiple pages.  The current parquet spec, however, seems to require that pages begin at row boundaries (i.e. the repetition level R is 0 for the first value in each page, see [here](https://github.com/apache/parquet-format/blob/5205dc7b7c0b910ea6af33cadbd2963c0c47c726/src/main/thrift/parquet.thrift#L564) and [here](https://github.com/apache/parquet-format/blob/5205dc7b7c0b910ea6af33cadbd2963c0c47c726/src/main/thrift/parquet.thrift#L918)).  Do you concur and think this should be another blocking issue or part of this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1426589075

   > @wgtmac another issue to consider as you implement the page index is rows that span multiple pages. With nested columns, it is possible to have single rows that are so large that they exceed the requested page size. arrow-cpp currently will honor the page size by splitting these rows across multiple pages. The current parquet spec, however, seems to require that pages begin at row boundaries (i.e. the repetition level R is 0 for the first value in each page, see [here](https://github.com/apache/parquet-format/blob/5205dc7b7c0b910ea6af33cadbd2963c0c47c726/src/main/thrift/parquet.thrift#L564) and [here](https://github.com/apache/parquet-format/blob/5205dc7b7c0b910ea6af33cadbd2963c0c47c726/src/main/thrift/parquet.thrift#L918)). Do you concur and think this should be another blocking issue or part of this PR?
   
   Thanks for the information. @etseidl 
   
   Yes I have already noticed that a record may span across different pages. But in the parquet-cpp, the page size check always happens at the end of each batch. Therefore it guarantees that a page will not split any record. Please check this function as well as where it is called for reference: https://github.com/apache/arrow/blob/master/cpp/src/parquet/column_writer.cc#L1376 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1501956345

   @wjones127 Could we make it to the v12.0.0 release?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157957099


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,361 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+                                 ::arrow::field("c1", ::arrow::utf8()),
+                                 ::arrow::field("c2", ::arrow::list(::arrow::int64()))});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a",  [1]      ],
+      [2,     "b",  [1, 2]   ],
+      [3,     "c",  [null]   ],
+      [null,  "d",  []       ],
+      [5,     null, [3, 3, 3]],
+      [6,     "f",  null     ]
+    ])");
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<std::string> c2_min_values = {encode_int64(1), encode_int64(3)};
+  const std::vector<std::string> c2_max_values = {encode_int64(2), encode_int64(3)};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+  const std::vector<int64_t> c2_null_counts = {2, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_FALSE(c0_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_FALSE(c1_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);
+
+    // Verify column index of c2.
+    auto c2_column_index = row_group_index_reader->GetColumnIndex(2);
+    ASSERT_NE(c2_column_index, nullptr);
+    ASSERT_EQ(num_pages, c2_column_index->null_pages().size());
+    ASSERT_FALSE(c2_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c2_column_index->boundary_order());
+    ASSERT_EQ(c2_min_values[rg], c2_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c2_max_values[rg], c2_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c2_column_index->has_null_counts());
+    ASSERT_EQ(c2_null_counts[rg], c2_column_index->null_counts()[0]);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithLargeStatsDropped) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(1) /* write single-row row group */
+                               ->max_statistics_size(20) /* drop stats larger than it */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      ["short_string"],
+      ["very_large_string_to_drop_stats"]
+    ])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index.
+    auto column_index = row_group_index_reader->GetColumnIndex(0);
+    if (rg == 0) {
+      ASSERT_NE(column_index, nullptr);
+      ASSERT_EQ(num_pages, column_index->null_pages().size());
+      ASSERT_FALSE(column_index->null_pages()[0]);
+      ASSERT_EQ(BoundaryOrder::Ascending, column_index->boundary_order());
+      ASSERT_EQ("short_string", column_index->encoded_min_values()[0]);
+      ASSERT_EQ("short_string", column_index->encoded_max_values()[0]);
+      ASSERT_TRUE(column_index->has_null_counts());
+      ASSERT_EQ(0, column_index->null_counts()[0]);
+    } else {
+      // Large stats have been dropped, so does the column index.
+      ASSERT_EQ(column_index, nullptr);
+    }
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);

Review Comment:
   small nit: at least in [later versions of gtest](https://google.github.io/googletest/primer.html) expected value goes last (I forget what version we ware running here and i think it might have flip-flopped at some pont).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157983937


##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& page_stats,
+                               BoundaryOrder::type boundary_order, bool has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, /*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Integer values in the ascending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+  page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+  auto encode = [=](float value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+  };
+
+  // Float values with no specific order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));
+  page_stats.at(1).set_null_count(0).set_min(encode(1.1F)).set_max(encode(5.5F));
+  page_stats.at(2).set_null_count(0).set_min(encode(3.3F)).set_max(encode(6.6F));
+
+  TestWriteTypedColumnIndex(schema::Float("c1"), page_stats, BoundaryOrder::Unordered,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteDoubleColumnIndex) {
+  auto encode = [=](double value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(double));
+  };
+
+  // Double values with no specific order and without null count.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min(encode(1.2)).set_max(encode(4.4));
+  page_stats.at(1).set_min(encode(2.2)).set_max(encode(5.5));
+  page_stats.at(2).set_min(encode(3.3)).set_max(encode(-6.6));
+
+  TestWriteTypedColumnIndex(schema::Double("c1"), page_stats, BoundaryOrder::Unordered,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteByteArrayColumnIndex) {
+  // Byte array values with identical min/max.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min("bar").set_max("foo");
+  page_stats.at(1).set_min("bar").set_max("foo");
+  page_stats.at(2).set_min("bar").set_max("foo");
+
+  TestWriteTypedColumnIndex(schema::ByteArray("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteFLBAColumnIndex) {
+  // FLBA values in the ascending order with some null pages
+  std::vector<EncodedStatistics> page_stats(5);
+  page_stats.at(0).set_min("abc").set_max("ABC");
+  page_stats.at(1).all_null_value = true;
+  page_stats.at(2).set_min("foo").set_max("FOO");
+  page_stats.at(3).all_null_value = true;
+  page_stats.at(4).set_min("xyz").set_max("XYZ");
+
+  auto node =
+      schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
+                                  ConvertedType::NONE, /*length=*/3);
+  TestWriteTypedColumnIndex(std::move(node), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithAllNullPages) {
+  // All values are null.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(100).all_null_value = true;
+  page_stats.at(1).set_null_count(100).all_null_value = true;
+  page_stats.at(2).set_null_count(100).all_null_value = true;
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Unordered,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteColumnIndexWithInvalidNullCounts) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Some pages do not provide null_count
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min(encode(1)).set_max(encode(2)).set_null_count(0);
+  page_stats.at(1).set_min(encode(1)).set_max(encode(3));
+  page_stats.at(2).set_min(encode(2)).set_max(encode(3)).set_null_count(0);
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // 2nd page does not set anything
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(2).set_min(encode(3)).set_max(encode(4));
+
+  ColumnDescriptor descr(schema::Int32("c1"), /*max_definition_level=*/1, 0);
+  auto builder = ColumnIndexBuilder::Make(&descr);
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+  ASSERT_EQ(nullptr, builder->Build());
+
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  EXPECT_EQ(0, buffer->size());
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) {
+  schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")};
+  schema::NodePtr root = schema::GroupNode::Make("schema", Repetition::REPEATED, fields);
+  SchemaDescriptor schema;
+  schema.Init(root);
+
+  auto builder = PageIndexBuilder::Make(&schema);
+
+  // AppendRowGroup() is not called and expect throw.
+  ASSERT_THROW(builder->GetColumnIndexBuilder(0), ParquetException);
+  ASSERT_THROW(builder->GetOffsetIndexBuilder(0), ParquetException);
+
+  // Finish the builder without calling AppendRowGroup().
+  ASSERT_NO_THROW(builder->Finish());
+
+  // Verify WriteTo does not write anything.
+  auto sink = CreateOutputStream();
+  PageIndexLocation location;
+  builder->WriteTo(sink.get(), &location);
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  ASSERT_EQ(0, buffer->size());
+  ASSERT_TRUE(location.column_index_location.empty());
+  ASSERT_TRUE(location.offset_index_location.empty());
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithSingleRowGroup) {
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED,
+      {schema::ByteArray("c1"), schema::ByteArray("c2"), schema::ByteArray("c3")});
+  SchemaDescriptor schema;
+  schema.Init(root);
+
+  // Prepare page stats and page locations.
+  const std::vector<EncodedStatistics> page_stats = {
+      EncodedStatistics().set_null_count(0).set_min("a").set_max("b"),
+      EncodedStatistics().set_null_count(0).set_min("A").set_max("B")};
+  const std::vector<PageLocation> page_locations = {
+      {/*offset=*/128, /*compressed_page_size=*/512,
+       /*first_row_index=*/0},
+      {/*offset=*/1024, /*compressed_page_size=*/512,
+       /*first_row_index=*/0}};
+  const int64_t final_position = 200;
+
+  // Create builder and add pages of single row group.
+  // Note that the 3rd column does not add any pages and its page index is disabled.
+  auto builder = PageIndexBuilder::Make(&schema);
+  ASSERT_NO_THROW(builder->AppendRowGroup());
+  for (int i = 0; i < 2; ++i) {
+    ASSERT_NO_THROW(builder->GetColumnIndexBuilder(i)->AddPage(page_stats.at(i)));
+    ASSERT_NO_THROW(builder->GetColumnIndexBuilder(i)->Finish());
+    ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(i)->AddPage(page_locations.at(i)));
+    ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(i)->Finish(final_position));
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  // Verify WriteTo only serializes page index of first two columns.
+  auto sink = CreateOutputStream();
+  PageIndexLocation location;
+  builder->WriteTo(sink.get(), &location);
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+
+  const size_t num_row_groups = 1;
+  const size_t num_columns = 3;
+  ASSERT_EQ(num_row_groups, location.column_index_location.size());
+  ASSERT_EQ(num_row_groups, location.offset_index_location.size());
+  auto column_index_locations = location.column_index_location[0];
+  auto offset_index_locations = location.offset_index_location[0];
+  ASSERT_EQ(num_columns, column_index_locations.size());
+  ASSERT_EQ(num_columns, offset_index_locations.size());
+
+  auto properties = default_reader_properties();
+  for (int i = 0; i < 3; i++) {
+    if (i < 2) {
+      ASSERT_TRUE(column_index_locations[i].has_value());
+      ASSERT_TRUE(offset_index_locations[i].has_value());
+      auto ci_location = column_index_locations[i].value();
+      auto oi_location = offset_index_locations[i].value();
+
+      auto column_index =
+          ColumnIndex::Make(*schema.Column(i), buffer->data() + ci_location.offset,
+                            static_cast<uint32_t>(ci_location.length), properties);
+      const size_t num_pages = 1;
+      ASSERT_EQ(num_pages, column_index->null_pages().size());
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[0]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[0]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[0]);
+      ASSERT_TRUE(column_index->has_null_counts());
+      ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[0]);
+
+      auto offset_index =
+          OffsetIndex::Make(buffer->data() + oi_location.offset,
+                            static_cast<uint32_t>(oi_location.length), properties);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(page_locations[i].offset + final_position,
+                offset_index->page_locations()[0].offset);
+      ASSERT_EQ(page_locations[i].compressed_page_size,
+                offset_index->page_locations()[0].compressed_page_size);
+      ASSERT_EQ(page_locations[i].first_row_index,
+                offset_index->page_locations()[0].first_row_index);
+    } else {
+      ASSERT_FALSE(column_index_locations[i].has_value());
+      ASSERT_FALSE(offset_index_locations[i].has_value());
+    }
+  }
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithTwoRowGroups) {
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::ByteArray("c2")});
+  SchemaDescriptor schema;
+  schema.Init(root);
+
+  // Prepare page stats and page locations for two row groups.
+  const std::vector<std::vector<EncodedStatistics>> page_stats = {
+      /* 1st row group */
+      {EncodedStatistics().set_min("a").set_max("b"),
+       EncodedStatistics().set_null_count(0).set_min("A").set_max("B")},
+      /* 2nd row group */
+      {EncodedStatistics() /* corrupted stats */,
+       EncodedStatistics().set_null_count(0).set_min("bar").set_max("foo")}};
+  const std::vector<std::vector<PageLocation>> page_locations = {
+      /* 1st row group */
+      {{/*offset=*/128, /*compressed_page_size=*/512,
+        /*first_row_index=*/0},
+       {/*offset=*/1024, /*compressed_page_size=*/512,
+        /*first_row_index=*/0}},
+      /* 2nd row group */
+      {{/*offset=*/128, /*compressed_page_size=*/512,
+        /*first_row_index=*/0},
+       {/*offset=*/1024, /*compressed_page_size=*/512,
+        /*first_row_index=*/0}}};
+  const std::vector<int64_t> final_positions = {1024, 2048};
+
+  // Create builder and add pages of two row groups.
+  const size_t num_row_groups = 2;
+  const size_t num_columns = 2;
+  const size_t num_pages = 1;
+  auto builder = PageIndexBuilder::Make(&schema);
+  for (size_t rg = 0; rg < num_row_groups; ++rg) {
+    ASSERT_NO_THROW(builder->AppendRowGroup());
+    for (int c = 0; c < static_cast<int>(num_columns); ++c) {
+      ASSERT_NO_THROW(builder->GetColumnIndexBuilder(c)->AddPage(page_stats[rg][c]));
+      ASSERT_NO_THROW(builder->GetColumnIndexBuilder(c)->Finish());
+      ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(c)->AddPage(page_locations[rg][c]));
+      ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(c)->Finish(final_positions[rg]));
+    }
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  // Verify WriteTo only serializes valid page index.
+  auto sink = CreateOutputStream();
+  PageIndexLocation location;
+  builder->WriteTo(sink.get(), &location);
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  ASSERT_EQ(num_row_groups, location.column_index_location.size());
+  ASSERT_EQ(num_row_groups, location.offset_index_location.size());
+
+  // Verify data of deserialized page index.
+  auto properties = default_reader_properties();
+  for (size_t rg = 0; rg < num_row_groups; ++rg) {

Review Comment:
   would unwinding the loops + appropriate helper methods make this signifantly more verbose?
   
   Having logic in tests, at least for me, makes them much more difficult to read (sometimes it is warranted but sometimes it is cleaner to try make things less complex).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1159298271


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,361 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+                                 ::arrow::field("c1", ::arrow::utf8()),
+                                 ::arrow::field("c2", ::arrow::list(::arrow::int64()))});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a",  [1]      ],
+      [2,     "b",  [1, 2]   ],
+      [3,     "c",  [null]   ],
+      [null,  "d",  []       ],
+      [5,     null, [3, 3, 3]],
+      [6,     "f",  null     ]
+    ])");
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<std::string> c2_min_values = {encode_int64(1), encode_int64(3)};
+  const std::vector<std::string> c2_max_values = {encode_int64(2), encode_int64(3)};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+  const std::vector<int64_t> c2_null_counts = {2, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_FALSE(c0_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_FALSE(c1_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);
+
+    // Verify column index of c2.
+    auto c2_column_index = row_group_index_reader->GetColumnIndex(2);
+    ASSERT_NE(c2_column_index, nullptr);
+    ASSERT_EQ(num_pages, c2_column_index->null_pages().size());
+    ASSERT_FALSE(c2_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c2_column_index->boundary_order());
+    ASSERT_EQ(c2_min_values[rg], c2_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c2_max_values[rg], c2_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c2_column_index->has_null_counts());
+    ASSERT_EQ(c2_null_counts[rg], c2_column_index->null_counts()[0]);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithLargeStatsDropped) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(1) /* write single-row row group */
+                               ->max_statistics_size(20) /* drop stats larger than it */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      ["short_string"],
+      ["very_large_string_to_drop_stats"]
+    ])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index.
+    auto column_index = row_group_index_reader->GetColumnIndex(0);
+    if (rg == 0) {
+      ASSERT_NE(column_index, nullptr);
+      ASSERT_EQ(num_pages, column_index->null_pages().size());
+      ASSERT_FALSE(column_index->null_pages()[0]);
+      ASSERT_EQ(BoundaryOrder::Ascending, column_index->boundary_order());
+      ASSERT_EQ("short_string", column_index->encoded_min_values()[0]);
+      ASSERT_EQ("short_string", column_index->encoded_max_values()[0]);
+      ASSERT_TRUE(column_index->has_null_counts());
+      ASSERT_EQ(0, column_index->null_counts()[0]);
+    } else {
+      // Large stats have been dropped, so does the column index.
+      ASSERT_EQ(column_index, nullptr);
+    }
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithMultiplePages) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->data_pagesize(1) /* write multiple pages */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(
+      schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])",
+               R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+  auto row_group_index_reader = page_index_reader->RowGroup(0);
+  ASSERT_NE(row_group_index_reader, nullptr);
+
+  // Setup expected data.
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(3),
+                                                  encode_int64(6), ""};
+  const std::vector<std::string> c0_max_values = {encode_int64(2), encode_int64(4),
+                                                  encode_int64(6), ""};
+  const std::vector<std::string> c1_min_values = {"a", "c", "f", ""};
+  const std::vector<std::string> c1_max_values = {"b", "d", "f", ""};
+  const std::vector<int64_t> null_counts = {0, 0, 1, 2};
+  const std::vector<bool> null_pages = {false, false, false, true};
+  const size_t num_pages = 4;
+
+  // Verify page index by columns.
+  for (int col_idx = 0; col_idx < metadata->num_columns(); ++col_idx) {
+    // Verify column index.
+    auto column_index = row_group_index_reader->GetColumnIndex(col_idx);
+    ASSERT_NE(column_index, nullptr);
+    ASSERT_EQ(BoundaryOrder::Ascending, column_index->boundary_order());
+    ASSERT_EQ(num_pages, column_index->null_pages().size());
+    ASSERT_TRUE(column_index->has_null_counts());
+    for (size_t page_idx = 0; page_idx < num_pages; ++page_idx) {
+      ASSERT_EQ(null_pages[page_idx], column_index->null_pages()[page_idx]);
+      ASSERT_EQ(null_counts[page_idx], column_index->null_counts()[page_idx]);
+      if (col_idx == 0) {
+        ASSERT_EQ(c0_min_values[page_idx], column_index->encoded_min_values()[page_idx]);
+        ASSERT_EQ(c0_max_values[page_idx], column_index->encoded_max_values()[page_idx]);
+      } else {
+        ASSERT_EQ(c1_min_values[page_idx], column_index->encoded_min_values()[page_idx]);
+        ASSERT_EQ(c1_max_values[page_idx], column_index->encoded_max_values()[page_idx]);
+      }
+    }
+
+    // Verify offset index.
+    auto offset_index = row_group_index_reader->GetOffsetIndex(col_idx);
+    ASSERT_NE(offset_index, nullptr);
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    int64_t prev_offset = 0;
+    for (size_t page_idx = 0; page_idx < num_pages; ++page_idx) {
+      const auto& page_location = offset_index->page_locations()[page_idx];
+      ASSERT_EQ(static_cast<int64_t>(page_idx * 2), page_location.first_row_index);
+      ASSERT_GT(page_location.offset, prev_offset);

Review Comment:
   Unfortunately we don't. We may need to change the `PageReader` to return a list of offsets to data pages, otherwise we have no idea. The only thing we are sure is that the page offsets are in the ascending order so here is why I use `ASSERT_GT` to check offsets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1119541295


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1482,15 +1538,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
 
   void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values,
                          const uint8_t* valid_bits, int64_t valid_bits_offset,
-                         int64_t num_levels) {
+                         int64_t num_levels, int64_t num_nulls) {

Review Comment:
   You're right. I have added a doc string to explain them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] mapleFU commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1118029417


##########
cpp/src/parquet/statistics.cc:
##########
@@ -494,6 +494,8 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
                       int64_t null_count, int64_t distinct_count, bool has_min_max,
                       bool has_null_count, bool has_distinct_count, MemoryPool* pool)
       : TypedStatisticsImpl(descr, pool) {
+    has_null_count_ = has_null_count;
+    has_distinct_count_ = has_distinct_count;

Review Comment:
   I meet the same problem here, I think the syntax of "has_xxx" is like that, for a writer:
   * Writer can assure that if has right null-count ( if it not has any bugs )
   * Currently I found that ndv is never collected. If a user collect ndv in page1, but not collect ndv in page 2, it should be abandon.
   
   For reader:
   * When deserialize, reader should assume that ndv and null_count can be unset ( but currently, it doesn't work like this)
   * Deserialized statistics can call merge, but if either `null_count` or `ndv` is unset, all null_count should be discarded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "ursabot (via GitHub)" <gi...@apache.org>.
ursabot commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1502980669

   Benchmark runs are scheduled for baseline = a7c4e05adee7a568710ad0f62198a9614c0c9594 and contender = 0cf4ffaa90de6f036c363ac95d83474d6a1dfcc2. 0cf4ffaa90de6f036c363ac95d83474d6a1dfcc2 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/21c6a76563e1419b9c2f4357db177b18...56b367950c54484aa2eb7be5f2cd868c/)
   [Finished :arrow_down:0.62% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/f3f05965eae14dbca934dd21534dca74...ffa8d8c5614e44efbfbdaeff75b391e6/)
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/839ca730c9854d4aa77eaae14138c9f6...a080237f585e4f658575ebc51e50c235/)
   [Finished :arrow_down:0.15% :arrow_up:0.0%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/09e86445764341e7a434b92108977c0d...e1f9f5694bd3420c8e3530dfba1c1421/)
   Buildkite builds:
   [Finished] [`0cf4ffaa` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2668)
   [Finished] [`0cf4ffaa` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2701)
   [Finished] [`0cf4ffaa` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2666)
   [Finished] [`0cf4ffaa` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2692)
   [Finished] [`a7c4e05a` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2667)
   [Finished] [`a7c4e05a` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2700)
   [Finished] [`a7c4e05a` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2665)
   [Finished] [`a7c4e05a` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2691)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1482153656

   sorry still on my radar to look at.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132717312


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+    }
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());

Review Comment:
   could you remind me why this is null_pages?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157955861


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,361 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+                                 ::arrow::field("c1", ::arrow::utf8()),
+                                 ::arrow::field("c2", ::arrow::list(::arrow::int64()))});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a",  [1]      ],
+      [2,     "b",  [1, 2]   ],
+      [3,     "c",  [null]   ],
+      [null,  "d",  []       ],
+      [5,     null, [3, 3, 3]],
+      [6,     "f",  null     ]
+    ])");
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<std::string> c2_min_values = {encode_int64(1), encode_int64(3)};
+  const std::vector<std::string> c2_max_values = {encode_int64(2), encode_int64(3)};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+  const std::vector<int64_t> c2_null_counts = {2, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_FALSE(c0_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_FALSE(c1_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);
+
+    // Verify column index of c2.
+    auto c2_column_index = row_group_index_reader->GetColumnIndex(2);
+    ASSERT_NE(c2_column_index, nullptr);
+    ASSERT_EQ(num_pages, c2_column_index->null_pages().size());
+    ASSERT_FALSE(c2_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c2_column_index->boundary_order());
+    ASSERT_EQ(c2_min_values[rg], c2_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c2_max_values[rg], c2_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c2_column_index->has_null_counts());
+    ASSERT_EQ(c2_null_counts[rg], c2_column_index->null_counts()[0]);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithLargeStatsDropped) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(1) /* write single-row row group */
+                               ->max_statistics_size(20) /* drop stats larger than it */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      ["short_string"],
+      ["very_large_string_to_drop_stats"]
+    ])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {

Review Comment:
   Are all the loops necessary in this test?  it seems like there should only be two row-groups and one column?
   
   Given the repetiveness of these tests, is there someway to make the logic more readable (some sort of custom gmock matcher?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157982396


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), &min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), &max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);

Review Comment:
   doesn't logical type need to be accounted for here for things like Decimal type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] etseidl commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "etseidl (via GitHub)" <gi...@apache.org>.
etseidl commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1426607039

   > Yes I have already noticed that a record may span across different pages. But in the parquet-cpp, the page size check always happens at the end of each batch. Therefore it guarantees that a page will not split any record. Please check this function as well as where it is called for reference: https://github.com/apache/arrow/blob/master/cpp/src/parquet/column_writer.cc#L1376
   
   Perhaps I'm misunderstanding, but it appears that the function you referenced is called after a batch of values is written...I don't see where it is guaranteed that the end of a batch is also the end of a row.  But thanks for working on the page indexes, I think it's an important feature that arrow-cpp currently lacks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1445998528

   > Apologies if I missed it but do you plan to add truncation functionality for large stat values? Also, not sure what parquet-mr does here but should we stop saving stats on individual pages if the functionality is turned on.
   
   No, stats truncation is not part of the plan here. We can open a new issue to disable writing stats in the page header (which is deprecated by specs IIRC) once writing page index has been supported. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1119540850


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5147,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()

Review Comment:
   Added a check here: https://github.com/apache/arrow/pull/34054/files#diff-4bc4a077197b0bc71088567adb45a3a20e850290d76cb7af52e2f6b8c19ff6ebR5091



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1117615202


##########
cpp/src/parquet/statistics.cc:
##########
@@ -494,6 +494,8 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
                       int64_t null_count, int64_t distinct_count, bool has_min_max,
                       bool has_null_count, bool has_distinct_count, MemoryPool* pool)
       : TypedStatisticsImpl(descr, pool) {
+    has_null_count_ = has_null_count;
+    has_distinct_count_ = has_distinct_count;

Review Comment:
   It seems that this is a correct change.
   How about adding tests related to this to `cpp/src/parquet/statistics_test.cc` or spliting this to another pull request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132725203


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), &min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), &max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);
+    } catch (const ParquetException&) {
+      /// Simply return unordered for unsupported comparator.
+      return BoundaryOrder::Unordered;
+    }
+
+    /// Check if both min_values and max_values are in ascending order.
+    bool is_ascending = true;
+    for (size_t i = 1; i < min_values.size(); ++i) {
+      if (comparator->Compare(min_values[i], min_values[i - 1]) ||

Review Comment:
   this doesn't look write should this have an inequality?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157979542


##########
cpp/src/parquet/metadata.h:
##########
@@ -506,6 +506,21 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
 };
 
+/// \brief Public struct for location to all page indexes in a parquet file.
+struct PageIndexLocation {
+  /// Alias type of page index location of a row group. The index location
+  /// is located by column ordinal. If the column does not have the page index,
+  /// its value is set to std::nullopt.
+  using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
+  /// Alis type of page index location of a parquet file. The index location

Review Comment:
   ```suggestion
     /// Alias type of page index location of a parquet file. The index location
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1426636699

   > Please correct me if I am wrong. At least the arrow parquet writer guarantees this by calling ColumnWriter::WriteArrow like this: https://github.com/apache/arrow/blob/master/cpp/src/parquet/arrow/writer.cc#L154. Yes, the ParquetFileWriter itself does not prevent this.
   
   I think @etseidl is correct here, WriteArrow is working on leaf arrays and  IIRC rep/def levels in the code references are the only way to recover record boundaries.  Sorry its been a busy week will aim to catchup on reviews next week.  It would also be nice to not special case this for Arrow even it does somehow work there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] etseidl commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "etseidl (via GitHub)" <gi...@apache.org>.
etseidl commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1450568424

   > Yes, simply dropping stats other than truncating stats is less error-prone. This in turn will drop column index of that column.
   
   I agree, truncating UTF-8 encoded strings is a pain (I know, I've done it). But the default cutoff for statistics is 4KB. That can lead to some pretty big footers when you have hundreds or thousands of pages. For comparison, parquet-mr uses a default of 64 bytes as a max for the column index statistics. I'm not saying truncation has to be part of this PR, just that it should be considered in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1118482264


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1482,15 +1538,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
 
   void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values,
                          const uint8_t* valid_bits, int64_t valid_bits_offset,
-                         int64_t num_levels) {
+                         int64_t num_levels, int64_t num_nulls) {

Review Comment:
   we should clarify what num_nulls means here.  We've had problem in the past on interpretation of empty-lists as nulls (I believe this is what the parquet spec expects), and num_null I think it usually derived from Arrow's concept but i could be mistaken.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132732335


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+
+  const size_t num_pages = 1;

Review Comment:
   it would be good to verify multiple pages in a row group also, end to end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157980299


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;

Review Comment:
   does it pay to clear null_counts here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1475656836

   > Sorry will try to do a review pass over this weekend.
   
   Thank you very much! @emkornfield 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1443155683

   This is ready to review now. Please take a look when you have time, thanks! @emkornfield @wjones127 @pitrou 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1159299215


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,361 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+                                 ::arrow::field("c1", ::arrow::utf8()),
+                                 ::arrow::field("c2", ::arrow::list(::arrow::int64()))});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a",  [1]      ],
+      [2,     "b",  [1, 2]   ],
+      [3,     "c",  [null]   ],
+      [null,  "d",  []       ],
+      [5,     null, [3, 3, 3]],
+      [6,     "f",  null     ]
+    ])");
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<std::string> c2_min_values = {encode_int64(1), encode_int64(3)};
+  const std::vector<std::string> c2_max_values = {encode_int64(2), encode_int64(3)};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+  const std::vector<int64_t> c2_null_counts = {2, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_FALSE(c0_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_FALSE(c1_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);
+
+    // Verify column index of c2.
+    auto c2_column_index = row_group_index_reader->GetColumnIndex(2);
+    ASSERT_NE(c2_column_index, nullptr);
+    ASSERT_EQ(num_pages, c2_column_index->null_pages().size());
+    ASSERT_FALSE(c2_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c2_column_index->boundary_order());
+    ASSERT_EQ(c2_min_values[rg], c2_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c2_max_values[rg], c2_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c2_column_index->has_null_counts());
+    ASSERT_EQ(c2_null_counts[rg], c2_column_index->null_counts()[0]);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithLargeStatsDropped) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(1) /* write single-row row group */
+                               ->max_statistics_size(20) /* drop stats larger than it */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      ["short_string"],
+      ["very_large_string_to_drop_stats"]
+    ])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index.
+    auto column_index = row_group_index_reader->GetColumnIndex(0);
+    if (rg == 0) {
+      ASSERT_NE(column_index, nullptr);
+      ASSERT_EQ(num_pages, column_index->null_pages().size());
+      ASSERT_FALSE(column_index->null_pages()[0]);
+      ASSERT_EQ(BoundaryOrder::Ascending, column_index->boundary_order());
+      ASSERT_EQ("short_string", column_index->encoded_min_values()[0]);
+      ASSERT_EQ("short_string", column_index->encoded_max_values()[0]);
+      ASSERT_TRUE(column_index->has_null_counts());
+      ASSERT_EQ(0, column_index->null_counts()[0]);
+    } else {
+      // Large stats have been dropped, so does the column index.
+      ASSERT_EQ(column_index, nullptr);
+    }
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);

Review Comment:
   I found a related discussion here: https://groups.google.com/a/chromium.org/g/chromium-dev/c/3aQU5iM5Ov0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1496824803

   Starting to look again now, sorry for the delay.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1161374999


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), &min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), &max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);

Review Comment:
   Yes, actually the sort order is deduced from either logical_type or converted_type as below:
   
   ```cpp
     SortOrder::type sort_order() const {
       auto la = logical_type();
       auto pt = physical_type();
       return la ? GetSortOrder(la, pt) : GetSortOrder(converted_type(), pt);
     }
   ```
   
   Detail can be found here: https://github.com/apache/arrow/blob/main/cpp/src/parquet/schema.h. So I think we are good. @wjones127 @emkornfield 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1443521191

   Opened an issue for the CI failure which is unrelated to this PR: https://github.com/apache/arrow/issues/34328


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1423771211

   I have encountered several issues while working on this patch. They must be resolved before proceeding this. So I have switched it to draft and will work on the blocking issues first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1426616058

   > > Yes I have already noticed that a record may span across different pages. But in the parquet-cpp, the page size check always happens at the end of each batch. Therefore it guarantees that a page will not split any record. Please check this function as well as where it is called for reference: https://github.com/apache/arrow/blob/master/cpp/src/parquet/column_writer.cc#L1376
   > 
   > Perhaps I'm misunderstanding, but it appears that the function you referenced is called after a batch of values is written...I don't see where it is guaranteed that the end of a batch is also the end of a row. But thanks for working on the page indexes, I think it's an important feature that arrow-cpp currently lacks.
   
   Please correct me if I am wrong. At least the arrow parquet writer guarantees this by calling `ColumnWriter::WriteArrow` like this: https://github.com/apache/arrow/blob/master/cpp/src/parquet/arrow/writer.cc#L154. Yes, the ParquetFileWriter itself does not prevent this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1482154744

   > sorry still on my radar to look at.
   
   No problem. It is never easy to review a large PR like this :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132727356


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), &min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), &max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);
+    } catch (const ParquetException&) {
+      /// Simply return unordered for unsupported comparator.
+      return BoundaryOrder::Unordered;
+    }
+
+    /// Check if both min_values and max_values are in ascending order.
+    bool is_ascending = true;
+    for (size_t i = 1; i < min_values.size(); ++i) {
+      if (comparator->Compare(min_values[i], min_values[i - 1]) ||

Review Comment:
   or is this <= relationship for compare?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1474062547

   Sorry will try to do a review pass over this weekend.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1133370962


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]

Review Comment:
   Added a list column to verify page index of repeated column.



##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), &min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), &max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);
+    } catch (const ParquetException&) {
+      /// Simply return unordered for unsupported comparator.
+      return BoundaryOrder::Unordered;
+    }
+
+    /// Check if both min_values and max_values are in ascending order.
+    bool is_ascending = true;
+    for (size_t i = 1; i < min_values.size(); ++i) {
+      if (comparator->Compare(min_values[i], min_values[i - 1]) ||

Review Comment:
   The `Compare` function tests if the 1st value is strictly less than the 2nd value as below.
   ```cpp
     /// \brief Scalar comparison of two elements, return true if first
     /// is strictly less than the second
     virtual bool Compare(const T& a, const T& b) = 0;
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};

Review Comment:
   Added `TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithLargeStatsDropped)` to cover this.



##########
cpp/src/parquet/properties.h:
##########
@@ -501,6 +502,28 @@ class PARQUET_EXPORT WriterProperties {
       return this;
     }
 
+    /// Enable writing page index.
+    ///
+    /// Page index contains statistics for data pages and can be used to skip pages
+    /// when scanning data in ordered and unordered columns.
+    ///
+    /// Please check the link below for more details:
+    /// https://github.com/apache/parquet-format/blob/master/PageIndex.md
+    ///
+    /// Default disabled.
+    Builder* enable_write_page_index() {

Review Comment:
   Agree. I will work on it once this PR gets merged.



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+
+  const size_t num_pages = 1;

Review Comment:
   Added `TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithMultiplePages)` to cover this.



##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& page_stats,
+                               BoundaryOrder::type boundary_order, bool has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, /*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Integer values in the ascending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+  page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+  auto encode = [=](float value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+  };
+
+  // Float values with no specific order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));
+  page_stats.at(1).set_null_count(0).set_min(encode(1.1F)).set_max(encode(5.5F));
+  page_stats.at(2).set_null_count(0).set_min(encode(3.3F)).set_max(encode(6.6F));
+
+  TestWriteTypedColumnIndex(schema::Float("c1"), page_stats, BoundaryOrder::Unordered,

Review Comment:
   There is a comment at line 528 already saying that the data has no specific order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1470257415

   Gentle ping @emkornfield 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1465557351

   All feedbacks have been addressed. Could you please take a look again? @emkornfield 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1491505218

   Gentle ping @emkornfield :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157974412


##########
cpp/src/parquet/column_page.h:
##########
@@ -64,33 +65,39 @@ class DataPage : public Page {
   Encoding::type encoding() const { return encoding_; }
   int64_t uncompressed_size() const { return uncompressed_size_; }
   const EncodedStatistics& statistics() const { return statistics_; }
+  std::optional<int64_t> first_row_index() const { return first_row_index_; }

Review Comment:
   could we add some docs on when this is expected to be present, what it is (and it is relative to RowGroup or complete file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1159304328


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;

Review Comment:
   No, I have cleared it at line 508 already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1118018736


##########
cpp/src/parquet/statistics.cc:
##########
@@ -494,6 +494,8 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
                       int64_t null_count, int64_t distinct_count, bool has_min_max,
                       bool has_null_count, bool has_distinct_count, MemoryPool* pool)
       : TypedStatisticsImpl(descr, pool) {
+    has_null_count_ = has_null_count;
+    has_distinct_count_ = has_distinct_count;

Review Comment:
   #34351/34355 is related.



##########
cpp/src/parquet/statistics.cc:
##########
@@ -494,6 +494,8 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
                       int64_t null_count, int64_t distinct_count, bool has_min_max,
                       bool has_null_count, bool has_distinct_count, MemoryPool* pool)
       : TypedStatisticsImpl(descr, pool) {
+    has_null_count_ = has_null_count;
+    has_distinct_count_ = has_distinct_count;

Review Comment:
   #34351/#34355 is related.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1118206891


##########
cpp/src/parquet/statistics.cc:
##########
@@ -494,6 +494,8 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
                       int64_t null_count, int64_t distinct_count, bool has_min_max,
                       bool has_null_count, bool has_distinct_count, MemoryPool* pool)
       : TypedStatisticsImpl(descr, pool) {
+    has_null_count_ = has_null_count;
+    has_distinct_count_ = has_distinct_count;

Review Comment:
   @kou Thanks for the information! Let's solve it in a separate PR because removing the change in statistics does not affect this one.
   
   @mapleFU As you are fixing statistics and hitting the same issue, do you think solving them all together is a good idea? I have left some comments in your PR and let's discuss there. I'll rebase my PR once yours is merged. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157957927


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,361 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+                                 ::arrow::field("c1", ::arrow::utf8()),
+                                 ::arrow::field("c2", ::arrow::list(::arrow::int64()))});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a",  [1]      ],
+      [2,     "b",  [1, 2]   ],
+      [3,     "c",  [null]   ],
+      [null,  "d",  []       ],
+      [5,     null, [3, 3, 3]],
+      [6,     "f",  null     ]
+    ])");
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<std::string> c2_min_values = {encode_int64(1), encode_int64(3)};
+  const std::vector<std::string> c2_max_values = {encode_int64(2), encode_int64(3)};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+  const std::vector<int64_t> c2_null_counts = {2, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_FALSE(c0_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_FALSE(c1_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);
+
+    // Verify column index of c2.
+    auto c2_column_index = row_group_index_reader->GetColumnIndex(2);
+    ASSERT_NE(c2_column_index, nullptr);
+    ASSERT_EQ(num_pages, c2_column_index->null_pages().size());
+    ASSERT_FALSE(c2_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c2_column_index->boundary_order());
+    ASSERT_EQ(c2_min_values[rg], c2_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c2_max_values[rg], c2_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c2_column_index->has_null_counts());
+    ASSERT_EQ(c2_null_counts[rg], c2_column_index->null_counts()[0]);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithLargeStatsDropped) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(1) /* write single-row row group */
+                               ->max_statistics_size(20) /* drop stats larger than it */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      ["short_string"],
+      ["very_large_string_to_drop_stats"]
+    ])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {

Review Comment:
   or maybe helper functions to consolidate elements into a vector and use EXPECT_THAT assertions over array elements?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1159978892


##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& page_stats,
+                               BoundaryOrder::type boundary_order, bool has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, /*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Integer values in the ascending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+  page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+  auto encode = [=](float value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+  };
+
+  // Float values with no specific order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));
+  page_stats.at(1).set_null_count(0).set_min(encode(1.1F)).set_max(encode(5.5F));
+  page_stats.at(2).set_null_count(0).set_min(encode(3.3F)).set_max(encode(6.6F));
+
+  TestWriteTypedColumnIndex(schema::Float("c1"), page_stats, BoundaryOrder::Unordered,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteDoubleColumnIndex) {
+  auto encode = [=](double value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(double));
+  };
+
+  // Double values with no specific order and without null count.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min(encode(1.2)).set_max(encode(4.4));
+  page_stats.at(1).set_min(encode(2.2)).set_max(encode(5.5));
+  page_stats.at(2).set_min(encode(3.3)).set_max(encode(-6.6));
+
+  TestWriteTypedColumnIndex(schema::Double("c1"), page_stats, BoundaryOrder::Unordered,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteByteArrayColumnIndex) {
+  // Byte array values with identical min/max.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min("bar").set_max("foo");
+  page_stats.at(1).set_min("bar").set_max("foo");
+  page_stats.at(2).set_min("bar").set_max("foo");
+
+  TestWriteTypedColumnIndex(schema::ByteArray("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteFLBAColumnIndex) {
+  // FLBA values in the ascending order with some null pages
+  std::vector<EncodedStatistics> page_stats(5);
+  page_stats.at(0).set_min("abc").set_max("ABC");
+  page_stats.at(1).all_null_value = true;
+  page_stats.at(2).set_min("foo").set_max("FOO");
+  page_stats.at(3).all_null_value = true;
+  page_stats.at(4).set_min("xyz").set_max("XYZ");
+
+  auto node =
+      schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
+                                  ConvertedType::NONE, /*length=*/3);
+  TestWriteTypedColumnIndex(std::move(node), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithAllNullPages) {
+  // All values are null.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(100).all_null_value = true;
+  page_stats.at(1).set_null_count(100).all_null_value = true;
+  page_stats.at(2).set_null_count(100).all_null_value = true;
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Unordered,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteColumnIndexWithInvalidNullCounts) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Some pages do not provide null_count
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min(encode(1)).set_max(encode(2)).set_null_count(0);
+  page_stats.at(1).set_min(encode(1)).set_max(encode(3));
+  page_stats.at(2).set_min(encode(2)).set_max(encode(3)).set_null_count(0);
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // 2nd page does not set anything
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(2).set_min(encode(3)).set_max(encode(4));
+
+  ColumnDescriptor descr(schema::Int32("c1"), /*max_definition_level=*/1, 0);
+  auto builder = ColumnIndexBuilder::Make(&descr);
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+  ASSERT_EQ(nullptr, builder->Build());
+
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  EXPECT_EQ(0, buffer->size());
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) {
+  schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")};
+  schema::NodePtr root = schema::GroupNode::Make("schema", Repetition::REPEATED, fields);
+  SchemaDescriptor schema;
+  schema.Init(root);
+
+  auto builder = PageIndexBuilder::Make(&schema);
+
+  // AppendRowGroup() is not called and expect throw.
+  ASSERT_THROW(builder->GetColumnIndexBuilder(0), ParquetException);
+  ASSERT_THROW(builder->GetOffsetIndexBuilder(0), ParquetException);
+
+  // Finish the builder without calling AppendRowGroup().
+  ASSERT_NO_THROW(builder->Finish());
+
+  // Verify WriteTo does not write anything.
+  auto sink = CreateOutputStream();
+  PageIndexLocation location;
+  builder->WriteTo(sink.get(), &location);
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  ASSERT_EQ(0, buffer->size());
+  ASSERT_TRUE(location.column_index_location.empty());
+  ASSERT_TRUE(location.offset_index_location.empty());
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithSingleRowGroup) {
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED,
+      {schema::ByteArray("c1"), schema::ByteArray("c2"), schema::ByteArray("c3")});
+  SchemaDescriptor schema;
+  schema.Init(root);
+
+  // Prepare page stats and page locations.
+  const std::vector<EncodedStatistics> page_stats = {
+      EncodedStatistics().set_null_count(0).set_min("a").set_max("b"),
+      EncodedStatistics().set_null_count(0).set_min("A").set_max("B")};
+  const std::vector<PageLocation> page_locations = {
+      {/*offset=*/128, /*compressed_page_size=*/512,
+       /*first_row_index=*/0},
+      {/*offset=*/1024, /*compressed_page_size=*/512,
+       /*first_row_index=*/0}};
+  const int64_t final_position = 200;
+
+  // Create builder and add pages of single row group.
+  // Note that the 3rd column does not add any pages and its page index is disabled.
+  auto builder = PageIndexBuilder::Make(&schema);
+  ASSERT_NO_THROW(builder->AppendRowGroup());
+  for (int i = 0; i < 2; ++i) {
+    ASSERT_NO_THROW(builder->GetColumnIndexBuilder(i)->AddPage(page_stats.at(i)));
+    ASSERT_NO_THROW(builder->GetColumnIndexBuilder(i)->Finish());
+    ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(i)->AddPage(page_locations.at(i)));
+    ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(i)->Finish(final_position));
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  // Verify WriteTo only serializes page index of first two columns.
+  auto sink = CreateOutputStream();
+  PageIndexLocation location;
+  builder->WriteTo(sink.get(), &location);
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+
+  const size_t num_row_groups = 1;
+  const size_t num_columns = 3;
+  ASSERT_EQ(num_row_groups, location.column_index_location.size());
+  ASSERT_EQ(num_row_groups, location.offset_index_location.size());
+  auto column_index_locations = location.column_index_location[0];
+  auto offset_index_locations = location.offset_index_location[0];
+  ASSERT_EQ(num_columns, column_index_locations.size());
+  ASSERT_EQ(num_columns, offset_index_locations.size());
+
+  auto properties = default_reader_properties();
+  for (int i = 0; i < 3; i++) {
+    if (i < 2) {
+      ASSERT_TRUE(column_index_locations[i].has_value());
+      ASSERT_TRUE(offset_index_locations[i].has_value());
+      auto ci_location = column_index_locations[i].value();
+      auto oi_location = offset_index_locations[i].value();
+
+      auto column_index =
+          ColumnIndex::Make(*schema.Column(i), buffer->data() + ci_location.offset,
+                            static_cast<uint32_t>(ci_location.length), properties);
+      const size_t num_pages = 1;
+      ASSERT_EQ(num_pages, column_index->null_pages().size());
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[0]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[0]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[0]);
+      ASSERT_TRUE(column_index->has_null_counts());
+      ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[0]);
+
+      auto offset_index =
+          OffsetIndex::Make(buffer->data() + oi_location.offset,
+                            static_cast<uint32_t>(oi_location.length), properties);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(page_locations[i].offset + final_position,
+                offset_index->page_locations()[0].offset);
+      ASSERT_EQ(page_locations[i].compressed_page_size,
+                offset_index->page_locations()[0].compressed_page_size);
+      ASSERT_EQ(page_locations[i].first_row_index,
+                offset_index->page_locations()[0].first_row_index);
+    } else {
+      ASSERT_FALSE(column_index_locations[i].has_value());
+      ASSERT_FALSE(offset_index_locations[i].has_value());
+    }
+  }
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithTwoRowGroups) {
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::ByteArray("c2")});
+  SchemaDescriptor schema;
+  schema.Init(root);
+
+  // Prepare page stats and page locations for two row groups.
+  const std::vector<std::vector<EncodedStatistics>> page_stats = {
+      /* 1st row group */
+      {EncodedStatistics().set_min("a").set_max("b"),
+       EncodedStatistics().set_null_count(0).set_min("A").set_max("B")},
+      /* 2nd row group */
+      {EncodedStatistics() /* corrupted stats */,
+       EncodedStatistics().set_null_count(0).set_min("bar").set_max("foo")}};
+  const std::vector<std::vector<PageLocation>> page_locations = {
+      /* 1st row group */
+      {{/*offset=*/128, /*compressed_page_size=*/512,
+        /*first_row_index=*/0},
+       {/*offset=*/1024, /*compressed_page_size=*/512,
+        /*first_row_index=*/0}},
+      /* 2nd row group */
+      {{/*offset=*/128, /*compressed_page_size=*/512,
+        /*first_row_index=*/0},
+       {/*offset=*/1024, /*compressed_page_size=*/512,
+        /*first_row_index=*/0}}};
+  const std::vector<int64_t> final_positions = {1024, 2048};
+
+  // Create builder and add pages of two row groups.
+  const size_t num_row_groups = 2;
+  const size_t num_columns = 2;
+  const size_t num_pages = 1;
+  auto builder = PageIndexBuilder::Make(&schema);
+  for (size_t rg = 0; rg < num_row_groups; ++rg) {
+    ASSERT_NO_THROW(builder->AppendRowGroup());
+    for (int c = 0; c < static_cast<int>(num_columns); ++c) {
+      ASSERT_NO_THROW(builder->GetColumnIndexBuilder(c)->AddPage(page_stats[rg][c]));
+      ASSERT_NO_THROW(builder->GetColumnIndexBuilder(c)->Finish());
+      ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(c)->AddPage(page_locations[rg][c]));
+      ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(c)->Finish(final_positions[rg]));
+    }
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  // Verify WriteTo only serializes valid page index.
+  auto sink = CreateOutputStream();
+  PageIndexLocation location;
+  builder->WriteTo(sink.get(), &location);
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  ASSERT_EQ(num_row_groups, location.column_index_location.size());
+  ASSERT_EQ(num_row_groups, location.offset_index_location.size());
+
+  // Verify data of deserialized page index.
+  auto properties = default_reader_properties();
+  for (size_t rg = 0; rg < num_row_groups; ++rg) {

Review Comment:
   I have rewritten the tests. Please take a look, thanks! @emkornfield 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1449327396

   > > > Apologies if I missed it but do you plan to add truncation functionality for large stat values?
   > > 
   > > 
   > > Large stats truncation is already implemented by parquet-cpp: https://github.com/apache/arrow/blob/main/cpp/src/parquet/statistics.h#L145. So we are good here.
   > 
   > Does the referenced code only apply to statistics in the chunk metadata (and page headers)? Statistics in the page indexes can and should be truncated since there are no guarantees that those statistics are actually contained in the page. See [second bullet](https://github.com/apache/parquet-format/blob/master/PageIndex.md#technical-approach) under "Some observations:".
   
   It applies to both page stats (in the header) and column chunk stats. In this PR, page index directly collects page stats after truncation. So it should not be a problem here. @etseidl 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1457398566

   All blocking issues have been resolved. 
   
   Please take a look, thanks! @emkornfield @westonpace @wjones127 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1426803534

   > > Please correct me if I am wrong. At least the arrow parquet writer guarantees this by calling ColumnWriter::WriteArrow like this: https://github.com/apache/arrow/blob/master/cpp/src/parquet/arrow/writer.cc#L154. Yes, the ParquetFileWriter itself does not prevent this.
   > 
   > I think @etseidl is correct here, WriteArrow is working on leaf arrays and IIRC rep/def levels in the code references are the only way to recover record boundaries. Sorry its been a busy week will aim to catchup on reviews next week. It would also be nice to not special case this for Arrow even it does somehow work there.
   
   @emkornfield Thanks for the explanation! No problem and this is not ready to review due to a series of blocking issues ahead.
   
   I strongly agree that writing via arrow should not be a special case. It sounds like splitting page at record boundary is a new blocking issue now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132731150


##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& page_stats,
+                               BoundaryOrder::type boundary_order, bool has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, /*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Integer values in the ascending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+  page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+  auto encode = [=](float value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+  };
+
+  // Float values with no specific order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));

Review Comment:
   I might be misrembering but I though indexes were more stringent around NaN comparison (because there was previously undefined behavior, we should probably fix NaN comparisons if this hasn't been done already).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132734343


##########
cpp/src/parquet/properties.h:
##########
@@ -501,6 +502,28 @@ class PARQUET_EXPORT WriterProperties {
       return this;
     }
 
+    /// Enable writing page index.
+    ///
+    /// Page index contains statistics for data pages and can be used to skip pages
+    /// when scanning data in ordered and unordered columns.
+    ///
+    /// Please check the link below for more details:
+    /// https://github.com/apache/parquet-format/blob/master/PageIndex.md
+    ///
+    /// Default disabled.
+    Builder* enable_write_page_index() {

Review Comment:
   it might be nice to be able to control which columns get a page index but not necessary here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1133282277


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+    }
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);

Review Comment:
   It has been verified in line 5227 already. Let me move it after column index to be more intuitive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1133468043


##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& page_stats,
+                               BoundaryOrder::type boundary_order, bool has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, /*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Integer values in the ascending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+  page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+  auto encode = [=](float value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+  };
+
+  // Float values with no specific order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));

Review Comment:
   Added `TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithNaNs)` to cover `NaN`, `+0.0` and `-0.0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1473971396

   @pitrou Do you have time to take a look? It would be nice to be included in the v12.0.0 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1419277781

   @pitrou @emkornfield Could you please take a look? The interface and implementation are complete with detailed comments. I will add test gradually.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1449362046

   > I didn't follow through the code exactly but the referenced code seems to simply drop statistics not-truncate them per the other comment
   
   Yes, simply dropping stats other than truncating stats is less error-prone. This in turn will drop column index of that column.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1118473924


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5147,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()

Review Comment:
   it would be nice to add an assertion to at least one other test to confirm the page index is in fact not generated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1449355023

   > It applies to both page stats (in the header) and column chunk stats. In this PR, page index directly collects page stats after truncation. So it should not be a problem here. @etseidl
   
   I didn't follow through the code exactly but the referenced code seems to simply drop statistics not-truncate them per the other comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] etseidl commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "etseidl (via GitHub)" <gi...@apache.org>.
etseidl commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1449032119

   > > Apologies if I missed it but do you plan to add truncation functionality for large stat values?
   > 
   > Large stats truncation is already implemented by parquet-cpp: https://github.com/apache/arrow/blob/main/cpp/src/parquet/statistics.h#L145. So we are good here.
   
   Does the referenced code only apply to statistics in the  chunk metadata (and page headers)? Statistics in the page indexes can and should be truncated since there are no guarantees that those statistics are actually contained in the page. See  [second bullet](https://github.com/apache/parquet-format/blob/master/PageIndex.md#technical-approach) under "Some observations:".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1486436402

   @wjones127 @westonpace Could you please take a look?
   
   Would be nice to be in the 12.0.0 milestone.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1133277991


##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& page_stats,
+                               BoundaryOrder::type boundary_order, bool has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, /*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Integer values in the ascending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+  page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+  auto encode = [=](float value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+  };
+
+  // Float values with no specific order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));

Review Comment:
   There is already some logic handling NaN values: https://github.com/apache/arrow/blob/main/cpp/src/parquet/statistics.cc#L300. I will look into it later to see if more actions are required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132728639


##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& page_stats,
+                               BoundaryOrder::type boundary_order, bool has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, /*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Integer values in the ascending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+  page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+  auto encode = [=](float value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+  };
+
+  // Float values with no specific order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));
+  page_stats.at(1).set_null_count(0).set_min(encode(1.1F)).set_max(encode(5.5F));
+  page_stats.at(2).set_null_count(0).set_min(encode(3.3F)).set_max(encode(6.6F));
+
+  TestWriteTypedColumnIndex(schema::Float("c1"), page_stats, BoundaryOrder::Unordered,

Review Comment:
   could we add a comment? on why unored?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 merged pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wjones127 (via GitHub)" <gi...@apache.org>.
wjones127 merged PR #34054:
URL: https://github.com/apache/arrow/pull/34054


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1159305528


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), &min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), &max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);

Review Comment:
   It has already been taken care of because `descr_` knows the logical type and sort order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157982849


##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& page_stats,
+                               BoundaryOrder::type boundary_order, bool has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, /*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {

Review Comment:
   ```suggestion
     auto encode = [](int32_t value) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157958750


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,361 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+                                 ::arrow::field("c1", ::arrow::utf8()),
+                                 ::arrow::field("c2", ::arrow::list(::arrow::int64()))});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a",  [1]      ],
+      [2,     "b",  [1, 2]   ],
+      [3,     "c",  [null]   ],
+      [null,  "d",  []       ],
+      [5,     null, [3, 3, 3]],
+      [6,     "f",  null     ]
+    ])");
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<std::string> c2_min_values = {encode_int64(1), encode_int64(3)};
+  const std::vector<std::string> c2_max_values = {encode_int64(2), encode_int64(3)};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+  const std::vector<int64_t> c2_null_counts = {2, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_FALSE(c0_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_FALSE(c1_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);
+
+    // Verify column index of c2.
+    auto c2_column_index = row_group_index_reader->GetColumnIndex(2);
+    ASSERT_NE(c2_column_index, nullptr);
+    ASSERT_EQ(num_pages, c2_column_index->null_pages().size());
+    ASSERT_FALSE(c2_column_index->null_pages()[0]);
+    ASSERT_EQ(BoundaryOrder::Ascending, c2_column_index->boundary_order());
+    ASSERT_EQ(c2_min_values[rg], c2_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c2_max_values[rg], c2_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c2_column_index->has_null_counts());
+    ASSERT_EQ(c2_null_counts[rg], c2_column_index->null_counts()[0]);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithLargeStatsDropped) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(1) /* write single-row row group */
+                               ->max_statistics_size(20) /* drop stats larger than it */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+      ["short_string"],
+      ["very_large_string_to_drop_stats"]
+    ])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify column index.
+    auto column_index = row_group_index_reader->GetColumnIndex(0);
+    if (rg == 0) {
+      ASSERT_NE(column_index, nullptr);
+      ASSERT_EQ(num_pages, column_index->null_pages().size());
+      ASSERT_FALSE(column_index->null_pages()[0]);
+      ASSERT_EQ(BoundaryOrder::Ascending, column_index->boundary_order());
+      ASSERT_EQ("short_string", column_index->encoded_min_values()[0]);
+      ASSERT_EQ("short_string", column_index->encoded_max_values()[0]);
+      ASSERT_TRUE(column_index->has_null_counts());
+      ASSERT_EQ(0, column_index->null_counts()[0]);
+    } else {
+      // Large stats have been dropped, so does the column index.
+      ASSERT_EQ(column_index, nullptr);
+    }
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+      ASSERT_NE(0, offset_index->page_locations()[0].offset);
+      ASSERT_NE(0, offset_index->page_locations()[0].compressed_page_size);
+    }
+  }
+}
+
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTripWithMultiplePages) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->data_pagesize(1) /* write multiple pages */
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via Table.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto table = ::arrow::TableFromJSON(
+      schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])",
+               R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"});
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+  auto row_group_index_reader = page_index_reader->RowGroup(0);
+  ASSERT_NE(row_group_index_reader, nullptr);
+
+  // Setup expected data.
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(3),
+                                                  encode_int64(6), ""};
+  const std::vector<std::string> c0_max_values = {encode_int64(2), encode_int64(4),
+                                                  encode_int64(6), ""};
+  const std::vector<std::string> c1_min_values = {"a", "c", "f", ""};
+  const std::vector<std::string> c1_max_values = {"b", "d", "f", ""};
+  const std::vector<int64_t> null_counts = {0, 0, 1, 2};
+  const std::vector<bool> null_pages = {false, false, false, true};
+  const size_t num_pages = 4;
+
+  // Verify page index by columns.
+  for (int col_idx = 0; col_idx < metadata->num_columns(); ++col_idx) {
+    // Verify column index.
+    auto column_index = row_group_index_reader->GetColumnIndex(col_idx);
+    ASSERT_NE(column_index, nullptr);
+    ASSERT_EQ(BoundaryOrder::Ascending, column_index->boundary_order());
+    ASSERT_EQ(num_pages, column_index->null_pages().size());
+    ASSERT_TRUE(column_index->has_null_counts());
+    for (size_t page_idx = 0; page_idx < num_pages; ++page_idx) {
+      ASSERT_EQ(null_pages[page_idx], column_index->null_pages()[page_idx]);
+      ASSERT_EQ(null_counts[page_idx], column_index->null_counts()[page_idx]);
+      if (col_idx == 0) {
+        ASSERT_EQ(c0_min_values[page_idx], column_index->encoded_min_values()[page_idx]);
+        ASSERT_EQ(c0_max_values[page_idx], column_index->encoded_max_values()[page_idx]);
+      } else {
+        ASSERT_EQ(c1_min_values[page_idx], column_index->encoded_min_values()[page_idx]);
+        ASSERT_EQ(c1_max_values[page_idx], column_index->encoded_max_values()[page_idx]);
+      }
+    }
+
+    // Verify offset index.
+    auto offset_index = row_group_index_reader->GetOffsetIndex(col_idx);
+    ASSERT_NE(offset_index, nullptr);
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    int64_t prev_offset = 0;
+    for (size_t page_idx = 0; page_idx < num_pages; ++page_idx) {
+      const auto& page_location = offset_index->page_locations()[page_idx];
+      ASSERT_EQ(static_cast<int64_t>(page_idx * 2), page_location.first_row_index);
+      ASSERT_GT(page_location.offset, prev_offset);

Review Comment:
   my main question here is how do we know the actual offset here is correct (same with other tests)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157937787


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,361 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+                                 ::arrow::field("c1", ::arrow::utf8()),
+                                 ::arrow::field("c2", ::arrow::list(::arrow::int64()))});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
+                             &arrow_writer));
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a",  [1]      ],
+      [2,     "b",  [1, 2]   ],
+      [3,     "c",  [null]   ],
+      [null,  "d",  []       ],
+      [5,     null, [3, 3, 3]],
+      [6,     "f",  null     ]
+    ])");
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {

Review Comment:
   ```suggestion
     auto encode_int64 = [](int64_t value) {
   ```
   Or is the capture doing something there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1419259506

   :warning: GitHub issue #34053 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1445995361

   Apologies if I missed it but do you plan to add truncation functionality for large stat values?  Also, not sure what parquet-mr does here but should we stop saving stats on individual pages if the functionality is turned on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1117608291


##########
cpp/src/parquet/statistics.cc:
##########
@@ -494,6 +494,8 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
                       int64_t null_count, int64_t distinct_count, bool has_min_max,
                       bool has_null_count, bool has_distinct_count, MemoryPool* pool)
       : TypedStatisticsImpl(descr, pool) {
+    has_null_count_ = has_null_count;
+    has_distinct_count_ = has_distinct_count;

Review Comment:
   This is related to the GLib failure.
   If this is a correct change, we can change the GLib test:
   
   ```diff
   diff --git a/c_glib/test/parquet/test-statistics.rb b/c_glib/test/parquet/test-statistics.rb
   index 4f21ebf00c..0367084c88 100644
   --- a/c_glib/test/parquet/test-statistics.rb
   +++ b/c_glib/test/parquet/test-statistics.rb
   @@ -51,7 +51,7 @@ class TestParquetStatistics < Test::Unit::TestCase
    
      test("#has_n_distinct_values?") do
        assert do
   -      @statistics.has_n_distinct_values?
   +      not @statistics.has_n_distinct_values?
        end
      end
    
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wjones127 (via GitHub)" <gi...@apache.org>.
wjones127 commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1501959909

   I will merge this later today if @emkornfield has no further comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34054:
URL: https://github.com/apache/arrow/pull/34054#issuecomment-1501286908

   > This looks good to me. I also tested it locally by turning on page index by default and verify the tests pass for C++ and Python (after changing assertions that it is turned off by default).
   
   Thanks for your review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157977282


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -335,6 +340,10 @@ class SerializedPageWriter : public PageWriter {
     if (meta_encryptor_ != nullptr) {
       UpdateEncryption(encryption::kColumnMetaData);
     }
+
+    // Serialized page writer does not need to adjust page offsets.
+    FinishPageIndex(/*final_position=*/0);

Review Comment:
   Maybe, FinishPageIndexes (it took me looking up the actual method implementation when reading through this that this finished both, column and offsets).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157977911


##########
cpp/src/parquet/column_writer.h:
##########
@@ -91,7 +93,9 @@ class PARQUET_EXPORT PageWriter {
       bool buffered_row_group = false,
       std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
       std::shared_ptr<Encryptor> data_encryptor = NULLPTR,
-      bool page_write_checksum_enabled = false);
+      bool page_write_checksum_enabled = false,
+      ColumnIndexBuilder* column_index_builder = NULLPTR,

Review Comment:
   for completeness we should clarify that the builders must outlive the the writer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wjones127 (via GitHub)" <gi...@apache.org>.
wjones127 commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1161011992


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), &min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), &max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);

Review Comment:
   `descr_` does, it doesn't seem to use the logical type here:
   
   https://github.com/apache/arrow/blob/82d48a93d0dc5ea7826207f154d9f6a1250ec57b/cpp/src/parquet/statistics.cc#L811
   
   But it does seem to be handled by the sort order argument:
   
   https://github.com/apache/arrow/blob/82d48a93d0dc5ea7826207f154d9f6a1250ec57b/cpp/src/parquet/statistics_test.cc#L77-L82



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132715248


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]

Review Comment:
   it would be nice to verify a repeated column also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132718201


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+    }
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);

Review Comment:
   it seems like we should be verifying we the offset indexes as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132719937


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+    }
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
+    ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
+    ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c0_column_index->has_null_counts());
+    ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);
+
+    // Verify column index of c1.
+    auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
+    ASSERT_NE(c1_column_index, nullptr);
+    ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
+    ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
+    ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
+    ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
+    ASSERT_TRUE(c1_column_index->has_null_counts());
+    ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);

Review Comment:
   Also first row index?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1132717745


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};

Review Comment:
   should we have a test to verify dropped statistics with large value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1133275238


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
+  // Enable page index to the writer.
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  auto pool = ::arrow::default_memory_pool();
+  auto sink = CreateOutputStream();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Prepare data and each row group contains 4 rows.
+  auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+      [1,     "a"],
+      [2,     "b"],
+      [3,     "c"],
+      [null,  "d"],
+      [5,     null],
+      [6,     "f"]
+    ])");
+
+  // Create writer to write data via RecordBatch.
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
+                             arrow_writer_properties, &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  // Create reader to read page index.
+  auto read_properties = default_arrow_reader_properties();
+  auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  auto metadata = reader->metadata();
+  ASSERT_EQ(2, metadata->num_row_groups());
+
+  // Make sure page index reader is not null.
+  auto page_index_reader = reader->GetPageIndexReader();
+  ASSERT_NE(page_index_reader, nullptr);
+
+  auto encode_int64 = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
+  const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
+  const std::vector<std::string> c1_min_values = {"a", "f"};
+  const std::vector<std::string> c1_max_values = {"d", "f"};
+  const std::vector<int64_t> c0_null_counts = {1, 0};
+  const std::vector<int64_t> c1_null_counts = {0, 1};
+
+  const size_t num_pages = 1;
+  for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+    auto row_group_index_reader = page_index_reader->RowGroup(rg);
+    ASSERT_NE(row_group_index_reader, nullptr);
+
+    // Verify offset index.
+    for (int c = 0; c < metadata->num_columns(); ++c) {
+      auto offset_index = row_group_index_reader->GetOffsetIndex(c);
+      ASSERT_NE(offset_index, nullptr);
+      ASSERT_EQ(num_pages, offset_index->page_locations().size());
+      ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
+    }
+
+    // Verify column index of c0.
+    auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
+    ASSERT_NE(c0_column_index, nullptr);
+    ASSERT_EQ(num_pages, c0_column_index->null_pages().size());

Review Comment:
   `null_pages` is a required list to indicate whether each page is a null page or not. So its size can be used to count number of pages here.
   
   ```thrift
   struct ColumnIndex {
     /**
      * A list of Boolean values to determine the validity of the corresponding
      * min and max values. If true, a page contains only null values, and writers
      * have to set the corresponding entries in min_values and max_values to
      * byte[0], so that all lists have the same length. If false, the
      * corresponding entries in min_values and max_values must be valid.
      */
     1: required list<bool> null_pages
   
     /**
      * Two lists containing lower and upper bounds for the values of each page
      * determined by the ColumnOrder of the column. These may be the actual
      * minimum and maximum values found on a page, but can also be (more compact)
      * values that do not exist on a page. For example, instead of storing ""Blart
      * Versenwald III", a writer may set min_values[i]="B", max_values[i]="C".
      * Such more compact values must still be valid values within the column's
      * logical type. Readers must make sure that list entries are populated before
      * using them by inspecting null_pages.
      */
     2: required list<binary> min_values
     3: required list<binary> max_values
   
     /**
      * Stores whether both min_values and max_values are ordered and if so, in
      * which direction. This allows readers to perform binary searches in both
      * lists. Readers cannot assume that max_values[i] <= min_values[i+1], even
      * if the lists are ordered.
      */
     4: required BoundaryOrder boundary_order
   
     /** A list containing the number of null values for each page **/
     5: optional list<i64> null_counts
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org