You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yi...@apache.org on 2021/07/06 01:10:49 UTC

[arrow] branch master updated: ARROW-13173: [C++] TestAsyncUtil.ReadaheadFailed asserts occasionally

This is an automated email from the ASF dual-hosted git repository.

yibocai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d9092ec  ARROW-13173: [C++] TestAsyncUtil.ReadaheadFailed asserts occasionally
d9092ec is described below

commit d9092ec7e11c2a626f9086fedead475846b52356
Author: Weston Pace <we...@gmail.com>
AuthorDate: Tue Jul 6 09:09:16 2021 +0800

    ARROW-13173: [C++] TestAsyncUtil.ReadaheadFailed asserts occasionally
    
    As @cyb70289 pointed out the test was dependent on timing and when running on a slow CI machine it could lead to failure.  I changed the test to use condition variables instead of sleeps so that it should be fully deterministic now.
    
    Closes #10602 from westonpace/bugfix/ARROW-13173--c-testasyncutil-readaheadfailed-asserts-occasi
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Yibo Cai <yi...@arm.com>
---
 cpp/src/arrow/util/async_generator.h       | 52 ++++++++++++++++--------------
 cpp/src/arrow/util/async_generator_test.cc | 43 +++++++++++-------------
 cpp/src/arrow/util/iterator.h              |  1 -
 3 files changed, 47 insertions(+), 49 deletions(-)

diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h
index c99bd86..5a6321f 100644
--- a/cpp/src/arrow/util/async_generator.h
+++ b/cpp/src/arrow/util/async_generator.h
@@ -697,30 +697,38 @@ class ReadaheadGenerator {
   ReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
       : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
 
-  Future<T> operator()() {
-    // Copy so we can capture into lambdas
+  Future<T> AddMarkFinishedContinuation(Future<T> fut) {
     auto state = state_;
-    if (state->readahead_queue.empty()) {
+    return fut.Then(
+        [state](const T& result) -> Result<T> {
+          state->MarkFinishedIfDone(result);
+          return result;
+        },
+        [state](const Status& err) -> Result<T> {
+          state->finished.store(true);
+          return err;
+        });
+  }
+
+  Future<T> operator()() {
+    if (state_->readahead_queue.empty()) {
       // This is the first request, let's pump the underlying queue
-      for (int i = 0; i < state->max_readahead; i++) {
-        auto next = state->source_generator();
-        auto state = state_;
-        next.AddCallback(
-            [state](const Result<T>& result) { state->MarkFinishedIfDone(result); });
-        state->readahead_queue.push(std::move(next));
+      for (int i = 0; i < state_->max_readahead; i++) {
+        auto next = state_->source_generator();
+        auto next_after_check = AddMarkFinishedContinuation(std::move(next));
+        state_->readahead_queue.push(std::move(next_after_check));
       }
     }
     // Pop one and add one
-    auto result = state->readahead_queue.front();
-    state->readahead_queue.pop();
-    if (state->finished.load()) {
-      state->readahead_queue.push(AsyncGeneratorEnd<T>());
+    auto result = state_->readahead_queue.front();
+    state_->readahead_queue.pop();
+    if (state_->finished.load()) {
+      state_->readahead_queue.push(AsyncGeneratorEnd<T>());
     } else {
-      auto back_of_queue = state->source_generator();
-      auto state = state_;
-      back_of_queue.AddCallback(
-          [state](const Result<T>& result) { state->MarkFinishedIfDone(result); });
-      state->readahead_queue.push(std::move(back_of_queue));
+      auto back_of_queue = state_->source_generator();
+      auto back_of_queue_after_check =
+          AddMarkFinishedContinuation(std::move(back_of_queue));
+      state_->readahead_queue.push(std::move(back_of_queue_after_check));
     }
     return result;
   }
@@ -732,13 +740,9 @@ class ReadaheadGenerator {
       finished.store(false);
     }
 
-    void MarkFinishedIfDone(const Result<T>& next_result) {
-      if (!next_result.ok()) {
+    void MarkFinishedIfDone(const T& next_result) {
+      if (IsIterationEnd(next_result)) {
         finished.store(true);
-      } else {
-        if (IsIterationEnd(*next_result)) {
-          finished.store(true);
-        }
       }
     }
 
diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc
index 87c1737..361ce3e 100644
--- a/cpp/src/arrow/util/async_generator_test.cc
+++ b/cpp/src/arrow/util/async_generator_test.cc
@@ -1094,42 +1094,37 @@ TEST(TestAsyncUtil, ReadaheadMove) {
 }
 
 TEST(TestAsyncUtil, ReadaheadFailed) {
-  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(4));
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(20));
   std::atomic<int32_t> counter(0);
+  auto gating_task = GatingTask::Make();
   // All tasks are a little slow.  The first task fails.
   // The readahead will have spawned 9 more tasks and they
   // should all pass
-  auto source = [thread_pool, &counter]() -> Future<TestInt> {
+  auto source = [&]() -> Future<TestInt> {
     auto count = counter++;
-    return *thread_pool->Submit([count]() -> Result<TestInt> {
+    return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
+      gating_task->Task()();
       if (count == 0) {
         return Status::Invalid("X");
       }
       return TestInt(count);
-    });
+    }));
   };
   auto readahead = MakeReadaheadGenerator<TestInt>(source, 10);
-  ASSERT_FINISHES_AND_RAISES(Invalid, readahead());
-  SleepABit();
-
-  for (int i = 0; i < 9; i++) {
-    ASSERT_FINISHES_OK_AND_ASSIGN(auto next_val, readahead());
-    ASSERT_EQ(TestInt(i + 1), next_val);
+  auto should_be_invalid = readahead();
+  // Polling once should allow 10 additional calls to start
+  ASSERT_OK(gating_task->WaitForRunning(11));
+  ASSERT_OK(gating_task->Unlock());
+
+  // Once unlocked the error task should always be the first.  Some number of successful
+  // tasks may follow until the end.
+  ASSERT_FINISHES_AND_RAISES(Invalid, should_be_invalid);
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto remaining_results, CollectAsyncGenerator(readahead));
+  // Don't need to know the exact number of successful tasks (and it may vary)
+  for (std::size_t i = 0; i < remaining_results.size(); i++) {
+    ASSERT_EQ(TestInt(static_cast<int>(i) + 1), remaining_results[i]);
   }
-  ASSERT_FINISHES_OK_AND_ASSIGN(auto after, readahead());
-
-  // It's possible that finished was set quickly and there
-  // are only 10 elements
-  if (IsIterationEnd(after)) {
-    return;
-  }
-
-  // It's also possible that finished was too slow and there
-  // ended up being 11 elements
-  ASSERT_EQ(TestInt(10), after);
-  // There can't be 12 elements because SleepABit will prevent it
-  ASSERT_FINISHES_OK_AND_ASSIGN(auto definitely_last, readahead());
-  ASSERT_TRUE(IsIterationEnd(definitely_last));
 }
 
 class EnumeratorTestFixture : public GeneratorTestFixture {
diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h
index b82021e..2f42803 100644
--- a/cpp/src/arrow/util/iterator.h
+++ b/cpp/src/arrow/util/iterator.h
@@ -20,7 +20,6 @@
 #include <cassert>
 #include <functional>
 #include <memory>
-#include <queue>
 #include <tuple>
 #include <type_traits>
 #include <utility>