You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mapleFU (via GitHub)" <gi...@apache.org> on 2023/02/22 03:16:39 UTC

[GitHub] [arrow] mapleFU commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

mapleFU commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1113775613


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {

Review Comment:
   So, the real written value-size could be larger than `batch_size`? It would be the all record before it find the first `rep-level = 0` record?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1373,11 +1426,13 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
     }
   }
 
-  void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) {
+  void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
+                                    bool check_page) {
     num_buffered_values_ += num_levels;
     num_buffered_encoded_values_ += num_values;
 
-    if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
+    if (check_page &&

Review Comment:
   Can we regard `!check_page` as force no page flush?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page=*/true);
+    } else {
+      // This is the last chunk of batch, and we do not know whether end_offset is a
+      // record boundary. Find the offset to beginning of last record in this chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;

Review Comment:
   What if num-levels = 0 and end-offset = 0?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page=*/true);
+    } else {
+      // This is the last chunk of batch, and we do not know whether end_offset is a

Review Comment:
   Should we `DCHECK( eq )` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org