You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/06/22 14:09:53 UTC

[arrow] branch main updated: GH-36092: [C++] Simplify concurrency in as-of-join node (#36094)

This is an automated email from the ASF dual-hosted git repository.

icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b29d9584e GH-36092: [C++] Simplify concurrency in as-of-join node (#36094)
1b29d9584e is described below

commit 1b29d9584e3db3949c1c9d36f146472c08aff8d3
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Thu Jun 22 17:09:46 2023 +0300

    GH-36092: [C++] Simplify concurrency in as-of-join node (#36094)
    
    ### What changes are included in this PR?
    
    The key hasher invalidation and memo-store time updating are moved to the processing thread.
    
    ### Are these changes tested?
    
    Yes, by existing tests.
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #36092
    
    Authored-by: Yaron Gvili <rt...@hotmail.com>
    Signed-off-by: Li Jin <ic...@gmail.com>
---
 cpp/src/arrow/acero/asof_join_node.cc | 24 ++++++++++--------------
 1 file changed, 10 insertions(+), 14 deletions(-)

diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc
index d3c988e18e..98e5918ebb 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -344,7 +344,7 @@ struct MemoStore {
   // the time of the current entry, defaulting to 0.
   // when entries with a time less than T are removed, the current time is updated to the
   // time of the next (by-time) and now-current entry or to T if no such entry exists.
-  std::atomic<OnType> current_time_;
+  OnType current_time_;
   // current entry per key
   std::unordered_map<ByType, Entry> entries_;
   // future entries per key
@@ -364,21 +364,16 @@ struct MemoStore {
     std::swap(index_, memo.index_);
 #endif
     std::swap(no_future_, memo.no_future_);
-    current_time_ = memo.current_time_.exchange(static_cast<OnType>(current_time_));
+    std::swap(current_time_, memo.current_time_);
     entries_.swap(memo.entries_);
     future_entries_.swap(memo.future_entries_);
     times_.swap(memo.times_);
   }
 
-  // Updates the current time to `ts` if it is less. A different thread may win the race
-  // to update the current time to more than `ts` but not to less. Returns whether the
-  // current time was changed from its value at the beginning of this invocation.
+  // Updates the current time to `ts` if it is less. Returns true if updated.
   bool UpdateTime(OnType ts) {
-    OnType prev_time = current_time_;
-    bool update = prev_time < ts;
-    while (prev_time < ts && !current_time_.compare_exchange_weak(prev_time, ts)) {
-      // intentionally empty - standard CAS loop
-    }
+    bool update = current_time_ < ts;
+    if (update) current_time_ = ts;
     return update;
   }
 
@@ -529,7 +524,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  std::atomic<const RecordBatch*> batch_;
+  const RecordBatch* batch_;
   std::vector<HashType> hashes_;
   LightContext ctx_;
   std::vector<KeyColumnArray> column_arrays_;
@@ -822,8 +817,11 @@ class InputState {
         ++batches_processed_;
         latest_ref_row_ = 0;
         have_active_batch &= !queue_.TryPop();
-        if (have_active_batch)
+        if (have_active_batch) {
           DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0);  // empty batches disallowed
+          key_hasher_->Invalidate();  // batch changed - invalidate key hasher's cache
+          memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0));  // time changed
+        }
       }
     }
     return have_active_batch;
@@ -899,8 +897,6 @@ class InputState {
 
   Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
     if (rb->num_rows() > 0) {
-      key_hasher_->Invalidate();  // batch changed - invalidate key hasher's cache
-      memo_.UpdateTime(GetTime(rb.get(), 0));  // time changed - update in MemoStore
       queue_.Push(rb);  // only after above updates - push batch for processing
     } else {
       ++batches_processed_;  // don't enqueue empty batches, just record as processed