You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "wjones127 (via GitHub)" <gi...@apache.org> on 2023/04/07 22:58:35 UTC

[GitHub] [arrow] wjones127 commented on a diff in pull request #34054: GH-34053: [C++][Parquet] Write parquet page index

wjones127 commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1161011992


##########
cpp/src/parquet/page_index.cc:
##########
@@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), &min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), &max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);

Review Comment:
   `descr_` does, it doesn't seem to use the logical type here:
   
   https://github.com/apache/arrow/blob/82d48a93d0dc5ea7826207f154d9f6a1250ec57b/cpp/src/parquet/statistics.cc#L811
   
   But it does seem to be handled by the sort order argument:
   
   https://github.com/apache/arrow/blob/82d48a93d0dc5ea7826207f154d9f6a1250ec57b/cpp/src/parquet/statistics_test.cc#L77-L82



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org