You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yi...@apache.org on 2021/07/06 01:10:49 UTC
[arrow] branch master updated: ARROW-13173: [C++]
TestAsyncUtil.ReadaheadFailed asserts occasionally
This is an automated email from the ASF dual-hosted git repository.
yibocai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d9092ec ARROW-13173: [C++] TestAsyncUtil.ReadaheadFailed asserts occasionally
d9092ec is described below
commit d9092ec7e11c2a626f9086fedead475846b52356
Author: Weston Pace <we...@gmail.com>
AuthorDate: Tue Jul 6 09:09:16 2021 +0800
ARROW-13173: [C++] TestAsyncUtil.ReadaheadFailed asserts occasionally
As @cyb70289 pointed out the test was dependent on timing and when running on a slow CI machine it could lead to failure. I changed the test to use condition variables instead of sleeps so that it should be fully deterministic now.
Closes #10602 from westonpace/bugfix/ARROW-13173--c-testasyncutil-readaheadfailed-asserts-occasi
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Yibo Cai <yi...@arm.com>
---
cpp/src/arrow/util/async_generator.h | 52 ++++++++++++++++--------------
cpp/src/arrow/util/async_generator_test.cc | 43 +++++++++++-------------
cpp/src/arrow/util/iterator.h | 1 -
3 files changed, 47 insertions(+), 49 deletions(-)
diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h
index c99bd86..5a6321f 100644
--- a/cpp/src/arrow/util/async_generator.h
+++ b/cpp/src/arrow/util/async_generator.h
@@ -697,30 +697,38 @@ class ReadaheadGenerator {
ReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
: state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
- Future<T> operator()() {
- // Copy so we can capture into lambdas
+ Future<T> AddMarkFinishedContinuation(Future<T> fut) {
auto state = state_;
- if (state->readahead_queue.empty()) {
+ return fut.Then(
+ [state](const T& result) -> Result<T> {
+ state->MarkFinishedIfDone(result);
+ return result;
+ },
+ [state](const Status& err) -> Result<T> {
+ state->finished.store(true);
+ return err;
+ });
+ }
+
+ Future<T> operator()() {
+ if (state_->readahead_queue.empty()) {
// This is the first request, let's pump the underlying queue
- for (int i = 0; i < state->max_readahead; i++) {
- auto next = state->source_generator();
- auto state = state_;
- next.AddCallback(
- [state](const Result<T>& result) { state->MarkFinishedIfDone(result); });
- state->readahead_queue.push(std::move(next));
+ for (int i = 0; i < state_->max_readahead; i++) {
+ auto next = state_->source_generator();
+ auto next_after_check = AddMarkFinishedContinuation(std::move(next));
+ state_->readahead_queue.push(std::move(next_after_check));
}
}
// Pop one and add one
- auto result = state->readahead_queue.front();
- state->readahead_queue.pop();
- if (state->finished.load()) {
- state->readahead_queue.push(AsyncGeneratorEnd<T>());
+ auto result = state_->readahead_queue.front();
+ state_->readahead_queue.pop();
+ if (state_->finished.load()) {
+ state_->readahead_queue.push(AsyncGeneratorEnd<T>());
} else {
- auto back_of_queue = state->source_generator();
- auto state = state_;
- back_of_queue.AddCallback(
- [state](const Result<T>& result) { state->MarkFinishedIfDone(result); });
- state->readahead_queue.push(std::move(back_of_queue));
+ auto back_of_queue = state_->source_generator();
+ auto back_of_queue_after_check =
+ AddMarkFinishedContinuation(std::move(back_of_queue));
+ state_->readahead_queue.push(std::move(back_of_queue_after_check));
}
return result;
}
@@ -732,13 +740,9 @@ class ReadaheadGenerator {
finished.store(false);
}
- void MarkFinishedIfDone(const Result<T>& next_result) {
- if (!next_result.ok()) {
+ void MarkFinishedIfDone(const T& next_result) {
+ if (IsIterationEnd(next_result)) {
finished.store(true);
- } else {
- if (IsIterationEnd(*next_result)) {
- finished.store(true);
- }
}
}
diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc
index 87c1737..361ce3e 100644
--- a/cpp/src/arrow/util/async_generator_test.cc
+++ b/cpp/src/arrow/util/async_generator_test.cc
@@ -1094,42 +1094,37 @@ TEST(TestAsyncUtil, ReadaheadMove) {
}
TEST(TestAsyncUtil, ReadaheadFailed) {
- ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(4));
+ ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(20));
std::atomic<int32_t> counter(0);
+ auto gating_task = GatingTask::Make();
// All tasks are a little slow. The first task fails.
// The readahead will have spawned 9 more tasks and they
// should all pass
- auto source = [thread_pool, &counter]() -> Future<TestInt> {
+ auto source = [&]() -> Future<TestInt> {
auto count = counter++;
- return *thread_pool->Submit([count]() -> Result<TestInt> {
+ return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
+ gating_task->Task()();
if (count == 0) {
return Status::Invalid("X");
}
return TestInt(count);
- });
+ }));
};
auto readahead = MakeReadaheadGenerator<TestInt>(source, 10);
- ASSERT_FINISHES_AND_RAISES(Invalid, readahead());
- SleepABit();
-
- for (int i = 0; i < 9; i++) {
- ASSERT_FINISHES_OK_AND_ASSIGN(auto next_val, readahead());
- ASSERT_EQ(TestInt(i + 1), next_val);
+ auto should_be_invalid = readahead();
+ // Polling once should allow 10 additional calls to start
+ ASSERT_OK(gating_task->WaitForRunning(11));
+ ASSERT_OK(gating_task->Unlock());
+
+ // Once unlocked the error task should always be the first. Some number of successful
+ // tasks may follow until the end.
+ ASSERT_FINISHES_AND_RAISES(Invalid, should_be_invalid);
+
+ ASSERT_FINISHES_OK_AND_ASSIGN(auto remaining_results, CollectAsyncGenerator(readahead));
+ // Don't need to know the exact number of successful tasks (and it may vary)
+ for (std::size_t i = 0; i < remaining_results.size(); i++) {
+ ASSERT_EQ(TestInt(static_cast<int>(i) + 1), remaining_results[i]);
}
- ASSERT_FINISHES_OK_AND_ASSIGN(auto after, readahead());
-
- // It's possible that finished was set quickly and there
- // are only 10 elements
- if (IsIterationEnd(after)) {
- return;
- }
-
- // It's also possible that finished was too slow and there
- // ended up being 11 elements
- ASSERT_EQ(TestInt(10), after);
- // There can't be 12 elements because SleepABit will prevent it
- ASSERT_FINISHES_OK_AND_ASSIGN(auto definitely_last, readahead());
- ASSERT_TRUE(IsIterationEnd(definitely_last));
}
class EnumeratorTestFixture : public GeneratorTestFixture {
diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h
index b82021e..2f42803 100644
--- a/cpp/src/arrow/util/iterator.h
+++ b/cpp/src/arrow/util/iterator.h
@@ -20,7 +20,6 @@
#include <cassert>
#include <functional>
#include <memory>
-#include <queue>
#include <tuple>
#include <type_traits>
#include <utility>