You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "wgtmac (via GitHub)" <gi...@apache.org> on 2023/02/15 07:12:06 UTC

[GitHub] [arrow] wgtmac opened a new pull request, #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

wgtmac opened a new pull request, #34193:
URL: https://github.com/apache/arrow/pull/34193

   ### Rationale for this change
   
   Parquet data page v2 requires pages change on record boundaries. Currently the parquet-cpp writer does not enforce this.
   
   ### What changes are included in this PR?
   
   Change `ColumnWriter` to split data page on record boundaries when data page v2 is applied.
   
   ### Are these changes tested?
   
   Add test `TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries)`
   
   ### Are there any user-facing changes?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34193:
URL: https://github.com/apache/arrow/pull/34193#issuecomment-1430861500

   :warning: GitHub issue #34142 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34193:
URL: https://github.com/apache/arrow/pull/34193#issuecomment-1439911145

   Please take another pass, thanks! @wjones127 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "ursabot (via GitHub)" <gi...@apache.org>.
ursabot commented on PR #34193:
URL: https://github.com/apache/arrow/pull/34193#issuecomment-1443090029

   Benchmark runs are scheduled for baseline = 29553fd679ec3c114ae6e1eef8e279e55ce55a18 and contender = b888f4d6c7dc490ce17b9f64d32af23ffc6f4617. b888f4d6c7dc490ce17b9f64d32af23ffc6f4617 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/2822e94bfce64e7e95b50f7b75a8a772...e6591b2416ab449b8e5a7faa5b9adbe5/)
   [Failed :arrow_down:0.43% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/99f05db73f774fcc833484c0ccf170e8...b7a5892dee2948309a4042c08831e46a/)
   [Finished :arrow_down:0.51% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/6414883eb4b14360a741d025da9897a1...88b5edfff5484d78a7d41410a8331bd1/)
   [Finished :arrow_down:0.54% :arrow_up:0.0%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/10bca693950c4a23acef3f7105667dd4...387a28724e434f849ef4a047f0b18b98/)
   Buildkite builds:
   [Finished] [`b888f4d6` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2428)
   [Failed] [`b888f4d6` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2458)
   [Finished] [`b888f4d6` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2425)
   [Finished] [`b888f4d6` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2449)
   [Finished] [`29553fd6` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2427)
   [Failed] [`29553fd6` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2457)
   [Finished] [`29553fd6` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2424)
   [Finished] [`29553fd6` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2448)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34193:
URL: https://github.com/apache/arrow/pull/34193#issuecomment-1430863725

   @wjones127 @emkornfield @pitrou @etseidl @mapleFU Please take a look if this is a clean fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] mapleFU commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1114064937


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1015,11 +1015,59 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page_size=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page_size=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page_size=*/true);
+    } else {
+      DCHECK_EQ(end_offset, num_levels);
+      // This is the last chunk of batch, and we do not know whether end_offset is a
+      // record boundary. Find the offset to beginning of last record in this chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;
+      while (last_record_begin_offset >= offset &&
+             rep_levels[last_record_begin_offset] != 0) {
+        last_record_begin_offset--;
+      }
+
+      if (offset < last_record_begin_offset) {
+        // We have found the beginning of last record and can check page size.
+        action(offset, last_record_begin_offset - offset, /*check_page_size=*/true);
+        offset = last_record_begin_offset;
+      }
+
+      // There is no record boundary in this chunk and cannot check page size.
+      action(offset, end_offset - offset, /*check_page_size=*/false);

Review Comment:
   And in normal cases, `end-offset` tents to extent `batch_size` to next record, it will not need to check like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1114062217


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1015,11 +1015,59 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page_size=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page_size=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page_size=*/true);
+    } else {
+      DCHECK_EQ(end_offset, num_levels);
+      // This is the last chunk of batch, and we do not know whether end_offset is a
+      // record boundary. Find the offset to beginning of last record in this chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;
+      while (last_record_begin_offset >= offset &&
+             rep_levels[last_record_begin_offset] != 0) {
+        last_record_begin_offset--;
+      }
+
+      if (offset < last_record_begin_offset) {
+        // We have found the beginning of last record and can check page size.
+        action(offset, last_record_begin_offset - offset, /*check_page_size=*/true);
+        offset = last_record_begin_offset;
+      }
+
+      // There is no record boundary in this chunk and cannot check page size.
+      action(offset, end_offset - offset, /*check_page_size=*/false);

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1113769699


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,

Review Comment:
   Yes, `num_levels` is the length of `def_levels` and `rep_levels`. It is used by various functions in this file so it would be better to be consistent.
   ```cpp
     Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels,
                                 int64_t num_levels, const ::arrow::Array& array,
                                 ArrowWriteContext* context, bool maybe_parent_nulls);
   
     Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels,
                            int64_t num_levels, const ::arrow::Array& array,
                            ArrowWriteContext* context, bool maybe_parent_nulls);
   ```
   
   IMHO, "depth" is the value at each slot of `def_levels` and `rep_levels`. So the name `num_levels` makes sense to me. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1109382565


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,33 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+    action(offset, end_offset - offset, /*check_page=*/end_offset < num_levels);

Review Comment:
   No, it slices the batch into chunks where each chunk ends at a record boundary except the last chunk. So all chunks except the last gets the chance to check the page size. This is similar to the `less robust solution` you've mentioned above. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1109334136


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,33 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+    action(offset, end_offset - offset, /*check_page=*/end_offset < num_levels);

Review Comment:
   this seems to assume that rep_levels always ends at a record boundary?  I can't recall off the top of my head if this is something that is document, or if it holds true in general even for the arrow case.  It seems a more robust solution would be  to propagate the offset back from this method once it reaches the end for the the callers to confirm. 
   
   A less robust solution would be to at least sanity check that if there is more then one rep_level that `rep_level[0]` is == 0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 merged pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wjones127 (via GitHub)" <gi...@apache.org>.
wjones127 merged PR #34193:
URL: https://github.com/apache/arrow/pull/34193


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wjones127 (via GitHub)" <gi...@apache.org>.
wjones127 commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1116335190


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,

Review Comment:
   This is fine to merge as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] mapleFU commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1114061205


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1015,11 +1015,59 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page_size=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page_size=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page_size=*/true);
+    } else {
+      DCHECK_EQ(end_offset, num_levels);
+      // This is the last chunk of batch, and we do not know whether end_offset is a
+      // record boundary. Find the offset to beginning of last record in this chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;
+      while (last_record_begin_offset >= offset &&
+             rep_levels[last_record_begin_offset] != 0) {
+        last_record_begin_offset--;
+      }
+
+      if (offset < last_record_begin_offset) {
+        // We have found the beginning of last record and can check page size.
+        action(offset, last_record_begin_offset - offset, /*check_page_size=*/true);
+        offset = last_record_begin_offset;
+      }
+
+      // There is no record boundary in this chunk and cannot check page size.
+      action(offset, end_offset - offset, /*check_page_size=*/false);

Review Comment:
   Oh, I got understand. The final batch would have:  `[offset, end_offset)`
   
   This patch will split it into two action:
   
   ```
   1. flush [offset, begining-of-last-record) with check flush page
   2. flush [begining-of-last-record, end-offset) force not flush page
   ```
   
   Am I right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] mapleFU commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1113775613


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {

Review Comment:
   So, the real written value-size could be larger than `batch_size`? It would be the all record before it find the first `rep-level = 0` record?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1373,11 +1426,13 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
     }
   }
 
-  void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) {
+  void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
+                                    bool check_page) {
     num_buffered_values_ += num_levels;
     num_buffered_encoded_values_ += num_values;
 
-    if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
+    if (check_page &&

Review Comment:
   Can we regard `!check_page` as force no page flush?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page=*/true);
+    } else {
+      // This is the last chunk of batch, and we do not know whether end_offset is a
+      // record boundary. Find the offset to beginning of last record in this chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;

Review Comment:
   What if num-levels = 0 and end-offset = 0?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page=*/true);
+    } else {
+      // This is the last chunk of batch, and we do not know whether end_offset is a

Review Comment:
   Should we `DCHECK( eq )` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wjones127 (via GitHub)" <gi...@apache.org>.
wjones127 commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1113516374


##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1143,5 +1143,140 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
   }
 }
 
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries) {
+  auto sink = CreateOutputStream();
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED,
+                      {
+                          schema::Int32("required", Repetition::REQUIRED),
+                          schema::Int32("optional", Repetition::OPTIONAL),
+                          schema::Int32("repeated", Repetition::REPEATED),
+                      }));
+  // Write 11 levels at a time
+  constexpr int64_t batch_size = 11;
+  auto properties = WriterProperties::Builder()
+                        .disable_dictionary()
+                        ->data_page_version(ParquetDataPageVersion::V2)
+                        ->write_batch_size(batch_size)
+                        ->data_pagesize(1)
+                        ->build();
+  auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+  auto rg_writer = file_writer->AppendRowGroup();
+
+  constexpr int32_t num_levels = 100;
+  const std::vector<int32_t> values(num_levels, 1024);
+  std::array<int16_t, num_levels> def_levels;
+  std::array<int16_t, num_levels> rep_levels;
+  for (int32_t i = 0; i < num_levels; i++) {
+    def_levels[i] = i % 2 == 0 ? 1 : 0;
+    rep_levels[i] = i % 2 == 0 ? 0 : 1;
+  }
+
+  auto required_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  required_writer->WriteBatch(num_levels, nullptr, nullptr, values.data());
+
+  // Write a null value at every other row.
+  auto optional_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  optional_writer->WriteBatch(num_levels, def_levels.data(), nullptr, values.data());
+
+  // Each row has repeated twice.
+  auto repeated_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+                              values.data());
+  repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+                              values.data());
+
+  ASSERT_NO_THROW(file_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+  auto file_reader = ParquetFileReader::Open(
+      std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties());
+  auto metadata = file_reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+  auto row_group_reader = file_reader->RowGroup(0);
+
+  // Check if pages are changed on record boundaries.
+  constexpr int num_columns = 3;
+  const std::array<int64_t, num_columns> expected_num_pages = {10, 10, 19};
+  for (int i = 0; i < num_columns; ++i) {
+    auto page_reader = row_group_reader->GetColumnPageReader(i);
+    int64_t num_rows = 0;
+    int64_t num_pages = 0;
+    std::shared_ptr<Page> page;
+    while ((page = page_reader->NextPage()) != nullptr) {
+      auto data_page = std::static_pointer_cast<DataPageV2>(page);
+      if (i < 2) {
+        EXPECT_EQ(data_page->num_values(), data_page->num_rows());
+      } else {
+        // Make sure repeated column has 2 values per row and not span multiple pages.
+        EXPECT_EQ(data_page->num_values(), 2 * data_page->num_rows());
+      }
+      num_rows += data_page->num_rows();
+      num_pages++;
+    }
+    EXPECT_EQ(num_levels, num_rows);
+    EXPECT_EQ(expected_num_pages[i], num_pages);
+  }
+}
+
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundariesLargeBatchSize) {

Review Comment:
   For this test, what I would be more interested in seeing is writing a column where the number of repeats > `batch_size`. What do you think of that? Also perhaps a case where `number of repeats` = `batch_size` - 1.



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,

Review Comment:
   Why is this called `num_levels`? It seems to represent the number of values, and in turn the length of `def_levels` and `rep_levels`. Is that correct? If so, `num_values` seems more appropriate, as I think `num_levels` implies "depth" rather than "length".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wjones127 (via GitHub)" <gi...@apache.org>.
wjones127 commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1116285459


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,

Review Comment:
   Okay that's fair. This is definitely a pre-existing issue, but I wish we used something like `std::span` instead of separate pointers and lengths everywhere. Would avoid these kind of naming issues.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34193:
URL: https://github.com/apache/arrow/pull/34193#issuecomment-1430861468

   * Closes: #34142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34193:
URL: https://github.com/apache/arrow/pull/34193#issuecomment-1439697272

   > The skeleton LGTM. I wonder that if ther is a corner case:
   > 
   > ```
   > rep-levels 1, 1, 0
   > batch = 3
   > ```
   > 
   > And we found last record is `0`, but the last record should not be flushed?
   
   In your case, the batch will call action twice:
   - levels: 1, 1, w/ page size check
   - levels: 0, w/o page size check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] mapleFU commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1114045512


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1015,11 +1015,59 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page_size=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page_size=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page_size=*/true);
+    } else {
+      DCHECK_EQ(end_offset, num_levels);
+      // This is the last chunk of batch, and we do not know whether end_offset is a
+      // record boundary. Find the offset to beginning of last record in this chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;
+      while (last_record_begin_offset >= offset &&
+             rep_levels[last_record_begin_offset] != 0) {
+        last_record_begin_offset--;
+      }
+
+      if (offset < last_record_begin_offset) {
+        // We have found the beginning of last record and can check page size.
+        action(offset, last_record_begin_offset - offset, /*check_page_size=*/true);
+        offset = last_record_begin_offset;
+      }
+
+      // There is no record boundary in this chunk and cannot check page size.
+      action(offset, end_offset - offset, /*check_page_size=*/false);

Review Comment:
   would it issue an:
   
   ```c++
   action(offset, 0, false);
   ```
   
   if the last record rep-level is actually `0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] mapleFU commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1114061205


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1015,11 +1015,59 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page_size=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page_size=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page_size=*/true);
+    } else {
+      DCHECK_EQ(end_offset, num_levels);
+      // This is the last chunk of batch, and we do not know whether end_offset is a
+      // record boundary. Find the offset to beginning of last record in this chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;
+      while (last_record_begin_offset >= offset &&
+             rep_levels[last_record_begin_offset] != 0) {
+        last_record_begin_offset--;
+      }
+
+      if (offset < last_record_begin_offset) {
+        // We have found the beginning of last record and can check page size.
+        action(offset, last_record_begin_offset - offset, /*check_page_size=*/true);
+        offset = last_record_begin_offset;
+      }
+
+      // There is no record boundary in this chunk and cannot check page size.
+      action(offset, end_offset - offset, /*check_page_size=*/false);

Review Comment:
   Oh, I got understand. The final batch would have:  `[offset, end_offset)`
   
   This patch will split it into two action:
   
   ```
   1. flush [offset, begining-of-last-record) with check flush page
   2. flush [begining-of-last-record, end-offset) force not flush page
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1114056087


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1015,11 +1015,59 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page_size=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page_size=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page_size=*/true);
+    } else {
+      DCHECK_EQ(end_offset, num_levels);
+      // This is the last chunk of batch, and we do not know whether end_offset is a
+      // record boundary. Find the offset to beginning of last record in this chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;
+      while (last_record_begin_offset >= offset &&
+             rep_levels[last_record_begin_offset] != 0) {
+        last_record_begin_offset--;
+      }
+
+      if (offset < last_record_begin_offset) {
+        // We have found the beginning of last record and can check page size.
+        action(offset, last_record_begin_offset - offset, /*check_page_size=*/true);
+        offset = last_record_begin_offset;
+      }
+
+      // There is no record boundary in this chunk and cannot check page size.
+      action(offset, end_offset - offset, /*check_page_size=*/false);

Review Comment:
   It won't happen. In your case, end_offset == offset and end_offset == num_levels (see line 1050). However, offset (which is a valid index pointing to levels buffer) is always smaller than num_levels.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1113807163


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {

Review Comment:
   The `batch_size` here is used to cut the input levels into smaller batches.



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1373,11 +1426,13 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
     }
   }
 
-  void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) {
+  void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
+                                    bool check_page) {
     num_buffered_values_ += num_levels;
     num_buffered_encoded_values_ += num_values;
 
-    if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
+    if (check_page &&

Review Comment:
   Yes



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page=*/true);
+    } else {
+      // This is the last chunk of batch, and we do not know whether end_offset is a
+      // record boundary. Find the offset to beginning of last record in this chunk,
+      // so we can check page size.
+      int64_t last_record_begin_offset = num_levels - 1;

Review Comment:
   If num-levels = 0, it will not enter the while loop begins at line 1036.



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+
+    if (end_offset < num_levels) {
+      // This is not the last chunk of batch and end_offset is a record boundary.
+      // It is a good chance to check the page size.
+      action(offset, end_offset - offset, /*check_page=*/true);
+    } else {
+      // This is the last chunk of batch, and we do not know whether end_offset is a

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on PR #34193:
URL: https://github.com/apache/arrow/pull/34193#issuecomment-1439487839

   Generally, looks OK to me once other reviewers are happy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1113833768


##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1143,5 +1143,140 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
   }
 }
 
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries) {
+  auto sink = CreateOutputStream();
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED,
+                      {
+                          schema::Int32("required", Repetition::REQUIRED),
+                          schema::Int32("optional", Repetition::OPTIONAL),
+                          schema::Int32("repeated", Repetition::REPEATED),
+                      }));
+  // Write 11 levels at a time
+  constexpr int64_t batch_size = 11;
+  auto properties = WriterProperties::Builder()
+                        .disable_dictionary()
+                        ->data_page_version(ParquetDataPageVersion::V2)
+                        ->write_batch_size(batch_size)
+                        ->data_pagesize(1)
+                        ->build();
+  auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+  auto rg_writer = file_writer->AppendRowGroup();
+
+  constexpr int32_t num_levels = 100;
+  const std::vector<int32_t> values(num_levels, 1024);
+  std::array<int16_t, num_levels> def_levels;
+  std::array<int16_t, num_levels> rep_levels;
+  for (int32_t i = 0; i < num_levels; i++) {
+    def_levels[i] = i % 2 == 0 ? 1 : 0;
+    rep_levels[i] = i % 2 == 0 ? 0 : 1;
+  }
+
+  auto required_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  required_writer->WriteBatch(num_levels, nullptr, nullptr, values.data());
+
+  // Write a null value at every other row.
+  auto optional_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  optional_writer->WriteBatch(num_levels, def_levels.data(), nullptr, values.data());
+
+  // Each row has repeated twice.
+  auto repeated_writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+                              values.data());
+  repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+                              values.data());
+
+  ASSERT_NO_THROW(file_writer->Close());
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+  auto file_reader = ParquetFileReader::Open(
+      std::make_shared<::arrow::io::BufferReader>(buffer), default_reader_properties());
+  auto metadata = file_reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+  auto row_group_reader = file_reader->RowGroup(0);
+
+  // Check if pages are changed on record boundaries.
+  constexpr int num_columns = 3;
+  const std::array<int64_t, num_columns> expected_num_pages = {10, 10, 19};
+  for (int i = 0; i < num_columns; ++i) {
+    auto page_reader = row_group_reader->GetColumnPageReader(i);
+    int64_t num_rows = 0;
+    int64_t num_pages = 0;
+    std::shared_ptr<Page> page;
+    while ((page = page_reader->NextPage()) != nullptr) {
+      auto data_page = std::static_pointer_cast<DataPageV2>(page);
+      if (i < 2) {
+        EXPECT_EQ(data_page->num_values(), data_page->num_rows());
+      } else {
+        // Make sure repeated column has 2 values per row and not span multiple pages.
+        EXPECT_EQ(data_page->num_values(), 2 * data_page->num_rows());
+      }
+      num_rows += data_page->num_rows();
+      num_pages++;
+    }
+    EXPECT_EQ(num_levels, num_rows);
+    EXPECT_EQ(expected_num_pages[i], num_pages);
+  }
+}
+
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundariesLargeBatchSize) {

Review Comment:
   Update the test to cover different repeat patterns



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "emkornfield (via GitHub)" <gi...@apache.org>.
emkornfield commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1113871128


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,

Review Comment:
   +1 for consistency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1109387229


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,33 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+    action(offset, end_offset - offset, /*check_page=*/end_offset < num_levels);

Review Comment:
   The caveat is that if `num_levels` is less than `batch_size`, we cannot check page size because the loop in line 1039 does not work any more. I will fix it shortly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1109403990


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,33 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,
+                        bool pages_change_on_record_boundaries) {
+  if (!pages_change_on_record_boundaries || !rep_levels) {
+    // If rep_levels is null, then we are writing a non-repeated column.
+    // In this case, every record contains only one level.
+    return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
+  }
+
+  int64_t offset = 0;
+  while (offset < num_levels) {
+    int64_t end_offset = std::min(offset + batch_size, num_levels);
+    // Find next record boundary (i.e. ref_level = 0)
+    while (end_offset < num_levels && rep_levels[end_offset] != 0) {
+      end_offset++;
+    }
+    action(offset, end_offset - offset, /*check_page=*/end_offset < num_levels);

Review Comment:
   Fixed and a new test case has been added. Please check @emkornfield 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1116296631


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1014,11 +1014,58 @@ template <typename Action>
 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
   int64_t num_batches = static_cast<int>(total / batch_size);
   for (int round = 0; round < num_batches; round++) {
-    action(round * batch_size, batch_size);
+    action(round * batch_size, batch_size, /*check_page=*/true);
   }
   // Write the remaining values
   if (total % batch_size > 0) {
-    action(num_batches * batch_size, total % batch_size);
+    action(num_batches * batch_size, total % batch_size, /*check_page=*/true);
+  }
+}
+
+template <typename Action>
+inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
+                        int64_t num_levels, int64_t batch_size, Action&& action,

Review Comment:
   These pointers are optional and already used as public apis. Changing them into span is a braking change. That can be a separate patch and needs more discussion. 
   
   Do you have any other concerns? Or it can be merged first to unblock the page index impl in another PR of mine? @wjones127 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on pull request #34193: GH-34142: [C++][Parquet] Fix record not to span multiple pages

Posted by "wgtmac (via GitHub)" <gi...@apache.org>.
wgtmac commented on PR #34193:
URL: https://github.com/apache/arrow/pull/34193#issuecomment-1438712512

   Gentle ping @emkornfield 
   
   @pitrou @wjones127 Have time take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org