You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/08/04 00:23:06 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #13771: ARROW-17278: [C++][Benchmarking] Add AsofJoin Ordering Assertion and Benchmark Fixes

westonpace commented on code in PR #13771:
URL: https://github.com/apache/arrow/pull/13771#discussion_r937244194


##########
cpp/src/arrow/compute/exec/asof_join_benchmark.cc:
##########
@@ -77,14 +88,18 @@ static void TableJoinOverhead(benchmark::State& state,
     state.PauseTiming();
     ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
                          ExecPlan::Make(&ctx));
-    std::vector<ExecNode*> input_nodes = {*arrow::compute::MakeExecNode(
-        "table_source", plan.get(), {},
-        arrow::compute::TableSourceNodeOptions(left_table_stats.table, batch_size))};
+    std::vector<std::shared_ptr<arrow::internal::ThreadPool>> right_thread_pools;
+    std::shared_ptr<arrow::internal::ThreadPool> left_thread_pool =

Review Comment:
   Let's add a comment (can be the same as previous) explaining why this dedicated thread pool is needed.



##########
cpp/src/arrow/compute/exec/asof_join_benchmark.cc:
##########
@@ -49,6 +49,17 @@ static Result<TableStats> MakeTable(const TableGenerationProperties& properties)
   return Result<TableStats>({table, rows, rows * row_size});
 }
 
+static ExecNode* MakeTableSourceNode(std::shared_ptr<arrow::compute::ExecPlan> plan,

Review Comment:
   Can you add a comment of some kind explaining why you chose to use "table" directly instead of "table_source"?



##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -46,52 +46,66 @@ typedef uint64_t row_index_t;
 typedef int col_index_t;
 
 /**
- * Simple implementation for an unbound concurrent queue
+ * Simple implementation for an bounded concurrent queue
  */
 template <class T>
-class ConcurrentQueue {
+class ConcurrentBoundedQueue {
+  size_t _remaining;
+  std::vector<T> _buffer;
+  mutable std::mutex _gate;
+  std::condition_variable _not_full;
+  std::condition_variable _not_empty;
+
+  size_t _next_push = 0;
+  size_t _next_pop = 0;
+
  public:
+  explicit ConcurrentBoundedQueue(size_t capacity)
+      : _remaining(capacity), _buffer(capacity) {}
+
+  // Push new value to queue, waiting for capacity indefinitely.
+  void Push(const T& t) {
+    std::unique_lock<std::mutex> lock(_gate);
+    _not_full.wait(lock, [&] { return _remaining > 0; });
+    _buffer[_next_push++] = t;
+    _next_push %= _buffer.size();
+    --_remaining;
+    _not_empty.notify_one();
+  }
+  // Get oldest value from queue, or wait indefinitely for it.
   T Pop() {
-    std::unique_lock<std::mutex> lock(mutex_);
-    cond_.wait(lock, [&] { return !queue_.empty(); });
-    auto item = queue_.front();
-    queue_.pop();
-    return item;
-  }
-
-  void Push(const T& item) {
-    std::unique_lock<std::mutex> lock(mutex_);
-    queue_.push(item);
-    cond_.notify_one();
+    std::unique_lock<std::mutex> lock(_gate);
+    _not_empty.wait(lock, [&] { return _remaining < _buffer.size(); });
+    T r = _buffer[_next_pop++];
+    _next_pop %= _buffer.size();
+    ++_remaining;
+    _not_full.notify_one();
+    return r;
   }
-
+  // Try to pop the oldest value from the queue (or return nullopt if none)
   util::optional<T> TryPop() {
-    // Try to pop the oldest value from the queue (or return nullopt if none)
-    std::unique_lock<std::mutex> lock(mutex_);
-    if (queue_.empty()) {
-      return util::nullopt;
-    } else {
-      auto item = queue_.front();
-      queue_.pop();
-      return item;
-    }
+    std::unique_lock<std::mutex> lock(_gate);
+    if (_remaining == _buffer.size()) return util::nullopt;
+    T r = _buffer[_next_pop++];
+    _next_pop %= _buffer.size();
+    ++_remaining;
+    _not_full.notify_one();
+    return r;
   }
 
+  // Test whether empty
   bool Empty() const {
-    std::unique_lock<std::mutex> lock(mutex_);
-    return queue_.empty();
+    std::unique_lock<std::mutex> lock(_gate);
+    return _remaining == _buffer.size();
   }
 
+  int Size() const { return _buffer.size(); }

Review Comment:
   I don't remember if this kind of access outside the mutex causes TSAN errors or not.  Have you run TSAN locally?



##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -46,52 +46,66 @@ typedef uint64_t row_index_t;
 typedef int col_index_t;
 
 /**
- * Simple implementation for an unbound concurrent queue
+ * Simple implementation for an bounded concurrent queue
  */
 template <class T>
-class ConcurrentQueue {
+class ConcurrentBoundedQueue {
+  size_t _remaining;
+  std::vector<T> _buffer;
+  mutable std::mutex _gate;
+  std::condition_variable _not_full;
+  std::condition_variable _not_empty;
+
+  size_t _next_push = 0;
+  size_t _next_pop = 0;
+
  public:
+  explicit ConcurrentBoundedQueue(size_t capacity)
+      : _remaining(capacity), _buffer(capacity) {}
+
+  // Push new value to queue, waiting for capacity indefinitely.
+  void Push(const T& t) {
+    std::unique_lock<std::mutex> lock(_gate);
+    _not_full.wait(lock, [&] { return _remaining > 0; });
+    _buffer[_next_push++] = t;
+    _next_push %= _buffer.size();
+    --_remaining;
+    _not_empty.notify_one();
+  }
+  // Get oldest value from queue, or wait indefinitely for it.
   T Pop() {
-    std::unique_lock<std::mutex> lock(mutex_);
-    cond_.wait(lock, [&] { return !queue_.empty(); });
-    auto item = queue_.front();
-    queue_.pop();
-    return item;
-  }
-
-  void Push(const T& item) {
-    std::unique_lock<std::mutex> lock(mutex_);
-    queue_.push(item);
-    cond_.notify_one();
+    std::unique_lock<std::mutex> lock(_gate);
+    _not_empty.wait(lock, [&] { return _remaining < _buffer.size(); });
+    T r = _buffer[_next_pop++];
+    _next_pop %= _buffer.size();
+    ++_remaining;
+    _not_full.notify_one();
+    return r;
   }
-
+  // Try to pop the oldest value from the queue (or return nullopt if none)
   util::optional<T> TryPop() {
-    // Try to pop the oldest value from the queue (or return nullopt if none)
-    std::unique_lock<std::mutex> lock(mutex_);
-    if (queue_.empty()) {
-      return util::nullopt;
-    } else {
-      auto item = queue_.front();
-      queue_.pop();
-      return item;
-    }
+    std::unique_lock<std::mutex> lock(_gate);
+    if (_remaining == _buffer.size()) return util::nullopt;
+    T r = _buffer[_next_pop++];
+    _next_pop %= _buffer.size();
+    ++_remaining;
+    _not_full.notify_one();
+    return r;
   }
 
+  // Test whether empty
   bool Empty() const {
-    std::unique_lock<std::mutex> lock(mutex_);
-    return queue_.empty();
+    std::unique_lock<std::mutex> lock(_gate);
+    return _remaining == _buffer.size();
   }
 
+  int Size() const { return _buffer.size(); }
+
   // Un-synchronized access to front
   // For this to be "safe":
   // 1) the caller logically guarantees that queue is not empty
   // 2) pop/try_pop cannot be called concurrently with this

Review Comment:
   Just `pop/try_pop`?  Or also `push`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org