You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2023/01/02 16:56:51 UTC

[arrow] branch master updated: ARROW-18240: [R] head() is crashing on some nightly builds (#14582)

This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b1a48c78a3 ARROW-18240: [R] head() is crashing on some nightly builds (#14582)
b1a48c78a3 is described below

commit b1a48c78a318402daab1d0f974825373ef41b293
Author: Dewey Dunnington <de...@voltrondata.com>
AuthorDate: Mon Jan 2 12:56:42 2023 -0400

    ARROW-18240: [R] head() is crashing on some nightly builds (#14582)
    
    Trying to address these two failures:
    
    - https://github.com/ursacomputing/crossbow/actions/runs/3368026889/jobs/5586109693#step:10:3813
    - https://github.com/ursacomputing/crossbow/actions/runs/3368024633/jobs/5586105172#step:9:3813
    
    
    Authored-by: Dewey Dunnington <de...@voltrondata.com>
    Signed-off-by: Dewey Dunnington <de...@voltrondata.com>
---
 r/src/compute-exec.cpp      | 11 ++++++++++-
 r/src/recordbatchreader.cpp | 13 +++++++------
 2 files changed, 17 insertions(+), 7 deletions(-)

diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index cb8ebd588b..feff0a2187 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -30,6 +30,15 @@
 #include <iostream>
 #include <optional>
 
+// GH-15151: Best path forward to make this available without a hack like this one
+namespace arrow {
+namespace io {
+namespace internal {
+arrow::internal::ThreadPool* GetIOThreadPool();
+}
+}  // namespace io
+}  // namespace arrow
+
 namespace compute = ::arrow::compute;
 
 std::shared_ptr<compute::FunctionOptions> make_compute_options(std::string func_name,
@@ -453,7 +462,7 @@ std::shared_ptr<compute::ExecNode> ExecNode_SourceNode(
   arrow::compute::SourceNodeOptions options{
       /*output_schema=*/reader->schema(),
       /*generator=*/ValueOrStop(
-          compute::MakeReaderGenerator(reader, arrow::internal::GetCpuThreadPool()))};
+          compute::MakeReaderGenerator(reader, arrow::io::internal::GetIOThreadPool()))};
 
   return MakeExecNodeOrStop("source", plan.get(), {}, options);
 }
diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp
index 8e9df12174..9ea4d91701 100644
--- a/r/src/recordbatchreader.cpp
+++ b/r/src/recordbatchreader.cpp
@@ -128,12 +128,12 @@ class RecordBatchReaderHead : public arrow::RecordBatchReader {
  public:
   RecordBatchReaderHead(std::shared_ptr<arrow::RecordBatchReader> reader,
                         int64_t num_rows)
-      : schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {}
+      : done_(false), schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {}
 
   std::shared_ptr<arrow::Schema> schema() const override { return schema_; }
 
   arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) override {
-    if (!reader_) {
+    if (done_) {
       // Close() has been called
       batch_out = nullptr;
       return arrow::Status::OK();
@@ -161,16 +161,17 @@ class RecordBatchReaderHead : public arrow::RecordBatchReader {
   }
 
   arrow::Status Close() override {
-    if (reader_) {
+    if (done_) {
+      return arrow::Status::OK();
+    } else {
+      done_ = true;
       arrow::Status result = reader_->Close();
-      reader_.reset();
       return result;
-    } else {
-      return arrow::Status::OK();
     }
   }
 
  private:
+  bool done_;
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::RecordBatchReader> reader_;
   int64_t num_rows_;