You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/04 13:04:10 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #9615: ARROW-3316: [R] Multi-threaded conversion from R data.frame to Arrow table / record batch

westonpace commented on a change in pull request #9615:
URL: https://github.com/apache/arrow/pull/9615#discussion_r625721013



##########
File path: r/src/r_to_arrow.cpp
##########
@@ -46,6 +50,41 @@ using internal::MakeConverter;
 
 namespace r {
 
+class RTasks {
+ public:
+  using Task = std::function<Status()>;

Review comment:
       You could use `FnOnce` in place of `std::function`.  It's in `arrow/util/functional.h`.  You should mostly be able to drop it in except you need to move the task when you run it (you might have to use a move iterator for `delayed_serial_tasks`) or when you pass it down into `parallel_tasks`.  The main advantage is anything captured by the function will be released immediately after the function runs.  There has been some efforts in Arrow to be consistent with using `FnOnce` where possible (ARROW-10966, ARROW-11191).

##########
File path: r/src/r_to_arrow.cpp
##########
@@ -46,6 +50,41 @@ using internal::MakeConverter;
 
 namespace r {
 
+class RTasks {
+ public:
+  using Task = std::function<Status()>;
+
+  RTasks()
+      : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded(
+            arrow::internal::GetCpuThreadPool())) {}
+
+  Status Finish() {

Review comment:
       To avoid nested deadlock this method (`Finish()`) must never be called from a thread pool thread.  This is because `Finish()` blocks on `parallel_tasks`.  You should be ok with your current usage but a comment somewhere might not hurt.

##########
File path: r/src/r_to_arrow.cpp
##########
@@ -46,6 +50,41 @@ using internal::MakeConverter;
 
 namespace r {
 
+class RTasks {
+ public:
+  using Task = std::function<Status()>;
+
+  RTasks()
+      : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded(
+            arrow::internal::GetCpuThreadPool())) {}
+
+  Status Finish() {
+    Status status = Status::OK();
+
+    // run the delayed tasks now
+    for (auto& task : delayed_serial_tasks) {
+      status &= task();
+      if (!status.ok()) break;
+    }
+
+    // then wait for the parallel tasks to finish
+    status &= parallel_tasks->Finish();

Review comment:
       It would be nice if there were a good way to trigger the parallel_tasks to fail early if `status` was not ok here (and we broke out of the serial loop above).

##########
File path: r/src/r_to_arrow.cpp
##########
@@ -168,46 +207,85 @@ bool is_NA<int64_t>(int64_t value) {
 }
 
 template <typename T>
-struct RVectorVisitor {
+class RVectorIterator {
+ public:
+  using value_type = T;
+  RVectorIterator(SEXP x, int64_t start)
+      : ptr_x_(reinterpret_cast<const T*>(DATAPTR_RO(x)) + start) {}
+
+  RVectorIterator& operator++() {
+    ++ptr_x_;
+    return *this;
+  }
+
+  const T operator*() const { return *ptr_x_; }
+
+ private:
+  const T* ptr_x_;
+};
+
+template <typename T>
+class RVectorIterator_ALTREP {
+ public:
+  using value_type = T;
   using data_type =
       typename std::conditional<std::is_same<T, int64_t>::value, double, T>::type;
   using r_vector_type = cpp11::r_vector<data_type>;
+  using r_vector_iterator = typename r_vector_type::const_iterator;
 
-  template <typename AppendNull, typename AppendValue>
-  static Status Visit(SEXP x, int64_t size, AppendNull&& append_null,
-                      AppendValue&& append_value) {
-    r_vector_type values(x);
-    auto it = values.begin();
-
-    for (R_xlen_t i = 0; i < size; i++, ++it) {
-      auto value = GetValue(*it);
-
-      if (is_NA<T>(value)) {
-        RETURN_NOT_OK(append_null());
-      } else {
-        RETURN_NOT_OK(append_value(value));
-      }
-    }
+  RVectorIterator_ALTREP(SEXP x, int64_t start)
+      : vector_(x), it_(vector_.begin() + start) {}
 
-    return Status::OK();
+  RVectorIterator_ALTREP& operator++() {
+    ++it_;
+    return *this;
   }
 
+  const T operator*() const { return GetValue(*it_); }
+
   static T GetValue(data_type x) { return x; }
+
+ private:
+  r_vector_type vector_;
+  r_vector_iterator it_;
 };
 
 template <>
-int64_t RVectorVisitor<int64_t>::GetValue(double x) {
+int64_t RVectorIterator_ALTREP<int64_t>::GetValue(double x) {
   int64_t value;
   memcpy(&value, &x, sizeof(int64_t));
   return value;
 }
 
+template <typename Iterator, typename AppendNull, typename AppendValue>
+Status VisitVector(Iterator it, int64_t n, AppendNull&& append_null,
+                   AppendValue&& append_value) {
+  for (R_xlen_t i = 0; i < n; i++, ++it) {

Review comment:
       Minor: You're pretty close to being able to use a range-based for loop here.  I'm not sure how difficult it would be to create an `end()` pointer and an iterator equality function.

##########
File path: r/src/r_to_arrow.cpp
##########
@@ -168,46 +207,85 @@ bool is_NA<int64_t>(int64_t value) {
 }
 
 template <typename T>
-struct RVectorVisitor {
+class RVectorIterator {
+ public:
+  using value_type = T;
+  RVectorIterator(SEXP x, int64_t start)
+      : ptr_x_(reinterpret_cast<const T*>(DATAPTR_RO(x)) + start) {}
+
+  RVectorIterator& operator++() {
+    ++ptr_x_;
+    return *this;
+  }
+
+  const T operator*() const { return *ptr_x_; }
+
+ private:
+  const T* ptr_x_;
+};
+
+template <typename T>
+class RVectorIterator_ALTREP {
+ public:
+  using value_type = T;
   using data_type =
       typename std::conditional<std::is_same<T, int64_t>::value, double, T>::type;
   using r_vector_type = cpp11::r_vector<data_type>;
+  using r_vector_iterator = typename r_vector_type::const_iterator;
 
-  template <typename AppendNull, typename AppendValue>
-  static Status Visit(SEXP x, int64_t size, AppendNull&& append_null,
-                      AppendValue&& append_value) {
-    r_vector_type values(x);
-    auto it = values.begin();
-
-    for (R_xlen_t i = 0; i < size; i++, ++it) {
-      auto value = GetValue(*it);
-
-      if (is_NA<T>(value)) {
-        RETURN_NOT_OK(append_null());
-      } else {
-        RETURN_NOT_OK(append_value(value));
-      }
-    }
+  RVectorIterator_ALTREP(SEXP x, int64_t start)
+      : vector_(x), it_(vector_.begin() + start) {}
 
-    return Status::OK();
+  RVectorIterator_ALTREP& operator++() {
+    ++it_;
+    return *this;
   }
 
+  const T operator*() const { return GetValue(*it_); }
+
   static T GetValue(data_type x) { return x; }
+
+ private:
+  r_vector_type vector_;
+  r_vector_iterator it_;
 };
 
 template <>
-int64_t RVectorVisitor<int64_t>::GetValue(double x) {
+int64_t RVectorIterator_ALTREP<int64_t>::GetValue(double x) {
   int64_t value;
   memcpy(&value, &x, sizeof(int64_t));
   return value;
 }
 
+template <typename Iterator, typename AppendNull, typename AppendValue>
+Status VisitVector(Iterator it, int64_t n, AppendNull&& append_null,
+                   AppendValue&& append_value) {
+  for (R_xlen_t i = 0; i < n; i++, ++it) {
+    auto value = *it;
+
+    if (is_NA<typename Iterator::value_type>(value)) {
+      RETURN_NOT_OK(append_null());
+    } else {
+      RETURN_NOT_OK(append_value(value));
+    }
+  }
+
+  return Status::OK();
+}
+
 class RConverter : public Converter<SEXP, RConversionOptions> {
  public:
   virtual Status Append(SEXP) { return Status::NotImplemented("Append"); }
 
   virtual Status Extend(SEXP values, int64_t size) {
-    return Status::NotImplemented("ExtendMasked");
+    return Status::NotImplemented("Extend");
+  }
+
+  // by default, just delay the ->Extend(), i.e. not run in parallel
+  // implementations might redefine so that ->Extend() is run in parallel
+  virtual void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) {
+    auto task = [this, values, size]() { return this->Extend(values, size); };

Review comment:
       The `this` capture is possibly worrisome.  It seems you should be ok since you block `Table__from_dots` until `Finish` is done but what if an error happens on line 1332 and `StopIfNotOk` bails out.  Is it possible it leaves (and allows `this` to go out of scope) before all the parallel tests have been cleared or flushed through?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org