You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/05/14 16:47:22 UTC

[GitHub] [arrow] pitrou opened a new pull request #7179: ARROW-8732: [C++] Add basic cancellation API

pitrou opened a new pull request #7179:
URL: https://github.com/apache/arrow/pull/7179


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-745294347


   An advantage of the "unified" model is that it should be less expensive to create a single token (even if it can handle multiple callbacks) that one at each delegation level.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r428087408



##########
File path: cpp/src/arrow/util/cancel.cc
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <utility>
+
+#include "arrow/util/cancel.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+StopCallback::StopCallback(StopToken* token, Callable cb)
+    : token_(token), cb_(std::move(cb)) {
+  if (token_ != nullptr) {
+    DCHECK(cb_);
+    // May call *this
+    token_->SetCallback(this);
+  }
+}
+
+StopCallback::~StopCallback() {
+  if (token_ != nullptr) {
+    token_->RemoveCallback(this);
+  }
+}
+
+StopCallback::StopCallback(StopCallback&& other) { *this = std::move(other); }
+
+StopCallback& StopCallback::operator=(StopCallback&& other) {
+  token_ = other.token_;
+  if (token_ != nullptr) {
+    other.token_ = nullptr;
+    token_->RemoveCallback(&other);
+  }
+  cb_ = std::move(other.cb_);
+  if (token_ != nullptr) {
+    // May call *this
+    token_->SetCallback(this);
+  }
+  return *this;
+}
+
+void StopCallback::Call(const Status& st) {
+  if (cb_) {
+    // Forget callable after calling it
+    Callable local_cb;
+    cb_.swap(local_cb);
+    local_cb(st);
+  }
+}
+
+struct StopToken::Impl {
+  std::mutex mutex_;
+  StopCallback* cb_ = nullptr;
+  bool requested_ = false;
+  Status cancel_error_;
+};
+
+StopToken::StopToken() : impl_(new Impl()) {}
+
+StopToken::~StopToken() {}
+
+Status StopToken::Poll() {
+  std::lock_guard<std::mutex> lock(impl_->mutex_);

Review comment:
       Done. Though I will need to write threaded tests and check using TSAN.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-745288205


   One possible limitation is that it doesn't seem easy for the consumer to know whether the producer will actually notice a cancellation request (some producers may be unable to?). Unless we expose a `StopToken::Accept` method that the producer calls to signal that it is able to react to cancellation requests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r543334176



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -238,6 +242,22 @@ class Future {
     return impl_->Wait(seconds);
   }
 
+  template <typename... CancelArgs>
+  bool Cancel(CancelArgs&&... args) {
+    auto& stop_token = impl_->stop_token_;

Review comment:
       That's a good question. In Java and Python, a completed Future cannot be cancelled (note the Python API is inspired from the Java API, AFAIK).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r543015642



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -218,7 +220,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
 
   ThreadPool();
 
-  Status SpawnReal(TaskHints hints, std::function<void()> task) override;
+  Status SpawnReal(TaskHints hints, std::function<void()> task,
+                   StopToken* = NULLPTR) override;

Review comment:
       Maybe I answered my own question.  The executor might get some benefit from freeing the resources sooner rather than later (e.g. to keep queues down and more efficient) and the executor can choose to completely ignore the stop token without any consequence so it isn't like it is putting much burden on the implementor of the executor interface.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r428615006



##########
File path: cpp/src/arrow/util/cancel.h
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class StopToken;
+
+// A RAII wrapper that automatically registers and unregisters
+// a callback to a StopToken.
+class ARROW_MUST_USE_TYPE ARROW_EXPORT StopCallback {
+ public:
+  using Callable = std::function<void(const Status&)>;
+  StopCallback(StopToken* token, Callable cb);
+  ~StopCallback();
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(StopCallback);
+  StopCallback(StopCallback&&);
+  StopCallback& operator=(StopCallback&&);
+
+  void Call(const Status&);
+
+ protected:
+  StopToken* token_;
+  Callable cb_;
+};
+
+class ARROW_EXPORT StopToken {
+ public:
+  StopToken();
+  ~StopToken();
+  ARROW_DISALLOW_COPY_AND_ASSIGN(StopToken);
+
+  Status Poll();
+  bool IsStopRequested();
+
+  void RequestStop();
+  void RequestStop(std::string message);
+  void RequestStop(Status error);
+
+  // Register a callback that will be called whenever cancellation happens.
+  // Note the callback may be called immediately, if cancellation was already
+  // requested.  The callback will be unregistered when the returned object
+  // is destroyed.
+  StopCallback SetCallback(StopCallback::Callable cb);

Review comment:
       Is there a reason why this is not part of the constructor, do you foresee cases where we need to change the callback dynamically?

##########
File path: cpp/src/arrow/util/cancel.h
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class StopToken;
+
+// A RAII wrapper that automatically registers and unregisters
+// a callback to a StopToken.
+class ARROW_MUST_USE_TYPE ARROW_EXPORT StopCallback {
+ public:
+  using Callable = std::function<void(const Status&)>;
+  StopCallback(StopToken* token, Callable cb);
+  ~StopCallback();
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(StopCallback);
+  StopCallback(StopCallback&&);
+  StopCallback& operator=(StopCallback&&);
+
+  void Call(const Status&);
+
+ protected:
+  StopToken* token_;
+  Callable cb_;
+};
+
+class ARROW_EXPORT StopToken {
+ public:
+  StopToken();
+  ~StopToken();
+  ARROW_DISALLOW_COPY_AND_ASSIGN(StopToken);
+
+  Status Poll();

Review comment:
       At the very least, document which methods are blocking. Do you think we should always make this explicit in the naming, e.g. `RequestStopBlocking()`

##########
File path: cpp/src/arrow/util/cancel_test.cc
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <functional>
+#include <random>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/cancel.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+static constexpr double kLongWait = 5;  // seconds
+
+class CancelTest : public ::testing::Test {};
+
+TEST_F(CancelTest, TokenBasics) {
+  {
+    StopToken token;
+    ASSERT_FALSE(token.IsStopRequested());
+    ASSERT_OK(token.Poll());
+
+    token.RequestStop();
+    ASSERT_TRUE(token.IsStopRequested());
+    ASSERT_RAISES(Cancelled, token.Poll());
+  }
+  {
+    StopToken token;
+    token.RequestStop(Status::IOError("Operation cancelled"));
+    ASSERT_TRUE(token.IsStopRequested());
+    ASSERT_RAISES(IOError, token.Poll());
+  }
+}
+
+TEST_F(CancelTest, RequestStopTwice) {
+  StopToken token;
+  token.RequestStop();
+  // Second RequestStop() call is ignored
+  token.RequestStop(Status::IOError("Operation cancelled"));
+  ASSERT_TRUE(token.IsStopRequested());
+  ASSERT_RAISES(Cancelled, token.Poll());
+}
+
+TEST_F(CancelTest, SetCallback) {
+  std::vector<int> results;
+  StopToken token;
+  {
+    const auto cb = token.SetCallback([&](const Status& st) { results.push_back(1); });
+    ASSERT_EQ(results.size(), 0);
+  }
+  {
+    const auto cb = token.SetCallback([&](const Status& st) { results.push_back(1); });
+    ASSERT_EQ(results.size(), 0);
+    token.RequestStop();
+    ASSERT_EQ(results, std::vector<int>{1});
+    token.RequestStop();
+    ASSERT_EQ(results, std::vector<int>{1});
+  }
+  {
+    const auto cb = token.SetCallback([&](const Status& st) { results.push_back(2); });
+    ASSERT_EQ(results, std::vector<int>({1, 2}));
+    token.RequestStop();
+    ASSERT_EQ(results, std::vector<int>({1, 2}));
+  }
+}

Review comment:
       Add a case where the callback is not set in the token. Then request the cancellation and set a callback after as this is documented.

##########
File path: cpp/src/arrow/util/cancel_test.cc
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <functional>
+#include <random>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/cancel.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+static constexpr double kLongWait = 5;  // seconds
+
+class CancelTest : public ::testing::Test {};
+
+TEST_F(CancelTest, TokenBasics) {
+  {
+    StopToken token;
+    ASSERT_FALSE(token.IsStopRequested());
+    ASSERT_OK(token.Poll());
+
+    token.RequestStop();
+    ASSERT_TRUE(token.IsStopRequested());
+    ASSERT_RAISES(Cancelled, token.Poll());
+  }
+  {
+    StopToken token;
+    token.RequestStop(Status::IOError("Operation cancelled"));
+    ASSERT_TRUE(token.IsStopRequested());
+    ASSERT_RAISES(IOError, token.Poll());
+  }
+}
+
+TEST_F(CancelTest, RequestStopTwice) {
+  StopToken token;
+  token.RequestStop();
+  // Second RequestStop() call is ignored
+  token.RequestStop(Status::IOError("Operation cancelled"));
+  ASSERT_TRUE(token.IsStopRequested());
+  ASSERT_RAISES(Cancelled, token.Poll());
+}
+
+TEST_F(CancelTest, SetCallback) {
+  std::vector<int> results;
+  StopToken token;
+  {
+    const auto cb = token.SetCallback([&](const Status& st) { results.push_back(1); });
+    ASSERT_EQ(results.size(), 0);
+  }
+  {
+    const auto cb = token.SetCallback([&](const Status& st) { results.push_back(1); });
+    ASSERT_EQ(results.size(), 0);
+    token.RequestStop();
+    ASSERT_EQ(results, std::vector<int>{1});
+    token.RequestStop();
+    ASSERT_EQ(results, std::vector<int>{1});
+  }
+  {
+    const auto cb = token.SetCallback([&](const Status& st) { results.push_back(2); });
+    ASSERT_EQ(results, std::vector<int>({1, 2}));
+    token.RequestStop();
+    ASSERT_EQ(results, std::vector<int>({1, 2}));
+  }
+}
+
+TEST_F(CancelTest, StopCallbackMove) {
+  std::vector<int> results;
+  StopToken token;
+
+  StopCallback cb1(&token, [&](const Status& st) { results.push_back(1); });
+  const auto cb2 = std::move(cb1);
+
+  ASSERT_EQ(results.size(), 0);
+  token.RequestStop();
+  ASSERT_EQ(results, std::vector<int>{1});
+}
+
+TEST_F(CancelTest, ThreadedPollSuccess) {
+  constexpr int kNumThreads = 3;
+  constexpr int kMaxLoops = 30000;
+
+  std::vector<Status> results(kNumThreads);
+  std::vector<std::thread> threads;
+
+  StopToken token;
+  auto barrier = Future<void>::Make();
+
+  const auto worker_func = [&](int thread_num) {
+    ARROW_CHECK(barrier.Wait(kLongWait));
+    for (int i = 0; token.Poll().ok() && i < kMaxLoops; ++i) {
+    }
+    results[thread_num] = token.Poll();
+  };
+  for (int i = 0; i < kNumThreads; ++i) {
+    threads.emplace_back(std::bind(worker_func, i));
+  }
+
+  // Let the threads start
+  SleepFor(1e-3);
+  // Unblock all threads
+  barrier.MarkFinished();
+  for (auto& thread : threads) {
+    thread.join();
+  }
+
+  for (const auto& st : results) {
+    ASSERT_OK(st);
+  }
+}
+
+TEST_F(CancelTest, ThreadedPollCancel) {
+  constexpr int kNumThreads = 3;
+  constexpr int kMaxLoops = 1000000;
+
+  std::vector<Status> results(kNumThreads);
+  std::vector<std::thread> threads;
+
+  StopToken token;
+  auto barrier = Future<void>::Make();
+
+  const auto worker_func = [&](int thread_num) {
+    ARROW_CHECK(barrier.Wait(kLongWait));
+    for (int i = 0; token.Poll().ok() && i < kMaxLoops; ++i) {
+    }
+    results[thread_num] = token.Poll();
+  };
+
+  for (int i = 0; i < kNumThreads; ++i) {
+    threads.emplace_back(std::bind(worker_func, i));
+  }
+  // Let the threads start
+  SleepFor(1e-3);
+  // Unblock all threads
+  barrier.MarkFinished();
+  token.RequestStop(Status::IOError("Operation cancelled"));
+  for (auto& thread : threads) {
+    thread.join();
+  }
+
+  for (const auto& st : results) {
+    ASSERT_RAISES(IOError, st);
+  }
+}
+
+TEST_F(CancelTest, ThreadedSetCallbackCancel) {
+  constexpr int kIterations = 100;
+  constexpr double kMaxWait = 1e-3;
+
+  std::default_random_engine gen(42);
+  std::uniform_real_distribution<double> wait_dist(0.0, kMaxWait);
+
+  for (int i = 0; i < kIterations; ++i) {
+    Status result;
+
+    StopToken token;
+    auto barrier = Future<void>::Make();
+
+    const auto worker_func = [&]() {
+      ARROW_CHECK(barrier.Wait(kLongWait));
+      token.RequestStop(Status::IOError("Operation cancelled"));
+    };
+    std::thread thread(worker_func);
+
+    // Unblock thread
+    barrier.MarkFinished();
+    // Use a variable wait time to maximize potential synchronization issues
+    const auto wait_time = wait_dist(gen);
+    if (wait_time > kMaxWait * 0.5) {
+      SleepFor(wait_time);

Review comment:
       Instead of sleeping, just use a second barrier that is MarkFinished by the cancelling thread after the RequestStop call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r543011950



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -238,6 +242,22 @@ class Future {
     return impl_->Wait(seconds);
   }
 
+  template <typename... CancelArgs>
+  bool Cancel(CancelArgs&&... args) {
+    auto& stop_token = impl_->stop_token_;

Review comment:
       What if the future has already been marked completed?  This could happen with a timing issue (user requests cancel at the same time the operation is finishing) or when we mix this in with continuations it could happen where a chain of futures is cancelled from the end, the cancellation will propagate backwards through the chain, and the first few futures may have already completed.
   
   This second case we could address when merging that change in too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r428135154



##########
File path: cpp/src/arrow/util/cancel.cc
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <utility>
+
+#include "arrow/util/cancel.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+StopCallback::StopCallback(StopToken* token, Callable cb)
+    : token_(token), cb_(std::move(cb)) {
+  if (token_ != nullptr) {
+    DCHECK(cb_);
+    // May call *this
+    token_->SetCallback(this);
+  }
+}
+
+StopCallback::~StopCallback() {
+  if (token_ != nullptr) {
+    token_->RemoveCallback(this);
+  }
+}
+
+StopCallback::StopCallback(StopCallback&& other) { *this = std::move(other); }
+
+StopCallback& StopCallback::operator=(StopCallback&& other) {
+  token_ = other.token_;
+  if (token_ != nullptr) {
+    other.token_ = nullptr;
+    token_->RemoveCallback(&other);
+  }
+  cb_ = std::move(other.cb_);
+  if (token_ != nullptr) {
+    // May call *this
+    token_->SetCallback(this);
+  }
+  return *this;
+}
+
+void StopCallback::Call(const Status& st) {
+  if (cb_) {
+    // Forget callable after calling it
+    Callable local_cb;
+    cb_.swap(local_cb);
+    local_cb(st);
+  }
+}
+
+struct StopToken::Impl {
+  std::mutex mutex_;
+  StopCallback* cb_ = nullptr;
+  bool requested_ = false;
+  Status cancel_error_;
+};
+
+StopToken::StopToken() : impl_(new Impl()) {}
+
+StopToken::~StopToken() {}
+
+Status StopToken::Poll() {
+  std::lock_guard<std::mutex> lock(impl_->mutex_);

Review comment:
       Threaded tests added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-628764727


   https://issues.apache.org/jira/browse/ARROW-8732


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-745290552


   Note also that in this PR, `StopToken` can have a single callback. We would have to change that if we want a single token to be potentially listened to by multiple producers.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r539522546



##########
File path: cpp/src/arrow/util/cancel_test.cc
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <functional>
+#include <random>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/cancel.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+static constexpr double kLongWait = 5;  // seconds
+
+class CancelTest : public ::testing::Test {};
+
+TEST_F(CancelTest, TokenBasics) {
+  {
+    StopToken token;
+    ASSERT_FALSE(token.IsStopRequested());
+    ASSERT_OK(token.Poll());
+
+    token.RequestStop();
+    ASSERT_TRUE(token.IsStopRequested());
+    ASSERT_RAISES(Cancelled, token.Poll());
+  }
+  {
+    StopToken token;
+    token.RequestStop(Status::IOError("Operation cancelled"));
+    ASSERT_TRUE(token.IsStopRequested());
+    ASSERT_RAISES(IOError, token.Poll());
+  }
+}
+
+TEST_F(CancelTest, RequestStopTwice) {
+  StopToken token;
+  token.RequestStop();
+  // Second RequestStop() call is ignored
+  token.RequestStop(Status::IOError("Operation cancelled"));
+  ASSERT_TRUE(token.IsStopRequested());
+  ASSERT_RAISES(Cancelled, token.Poll());
+}
+
+TEST_F(CancelTest, SetCallback) {
+  std::vector<int> results;
+  StopToken token;
+  {
+    const auto cb = token.SetCallback([&](const Status& st) { results.push_back(1); });
+    ASSERT_EQ(results.size(), 0);
+  }
+  {
+    const auto cb = token.SetCallback([&](const Status& st) { results.push_back(1); });
+    ASSERT_EQ(results.size(), 0);
+    token.RequestStop();
+    ASSERT_EQ(results, std::vector<int>{1});
+    token.RequestStop();
+    ASSERT_EQ(results, std::vector<int>{1});
+  }
+  {
+    const auto cb = token.SetCallback([&](const Status& st) { results.push_back(2); });
+    ASSERT_EQ(results, std::vector<int>({1, 2}));
+    token.RequestStop();
+    ASSERT_EQ(results, std::vector<int>({1, 2}));
+  }
+}
+
+TEST_F(CancelTest, StopCallbackMove) {
+  std::vector<int> results;
+  StopToken token;
+
+  StopCallback cb1(&token, [&](const Status& st) { results.push_back(1); });
+  const auto cb2 = std::move(cb1);
+
+  ASSERT_EQ(results.size(), 0);
+  token.RequestStop();
+  ASSERT_EQ(results, std::vector<int>{1});
+}
+
+TEST_F(CancelTest, ThreadedPollSuccess) {
+  constexpr int kNumThreads = 3;
+  constexpr int kMaxLoops = 30000;
+
+  std::vector<Status> results(kNumThreads);
+  std::vector<std::thread> threads;
+
+  StopToken token;
+  auto barrier = Future<void>::Make();
+
+  const auto worker_func = [&](int thread_num) {
+    ARROW_CHECK(barrier.Wait(kLongWait));
+    for (int i = 0; token.Poll().ok() && i < kMaxLoops; ++i) {
+    }
+    results[thread_num] = token.Poll();
+  };
+  for (int i = 0; i < kNumThreads; ++i) {
+    threads.emplace_back(std::bind(worker_func, i));
+  }
+
+  // Let the threads start
+  SleepFor(1e-3);
+  // Unblock all threads
+  barrier.MarkFinished();
+  for (auto& thread : threads) {
+    thread.join();
+  }
+
+  for (const auto& st : results) {
+    ASSERT_OK(st);
+  }
+}
+
+TEST_F(CancelTest, ThreadedPollCancel) {
+  constexpr int kNumThreads = 3;
+  constexpr int kMaxLoops = 1000000;
+
+  std::vector<Status> results(kNumThreads);
+  std::vector<std::thread> threads;
+
+  StopToken token;
+  auto barrier = Future<void>::Make();
+
+  const auto worker_func = [&](int thread_num) {
+    ARROW_CHECK(barrier.Wait(kLongWait));
+    for (int i = 0; token.Poll().ok() && i < kMaxLoops; ++i) {
+    }
+    results[thread_num] = token.Poll();
+  };
+
+  for (int i = 0; i < kNumThreads; ++i) {
+    threads.emplace_back(std::bind(worker_func, i));
+  }
+  // Let the threads start
+  SleepFor(1e-3);
+  // Unblock all threads
+  barrier.MarkFinished();
+  token.RequestStop(Status::IOError("Operation cancelled"));
+  for (auto& thread : threads) {
+    thread.join();
+  }
+
+  for (const auto& st : results) {
+    ASSERT_RAISES(IOError, st);
+  }
+}
+
+TEST_F(CancelTest, ThreadedSetCallbackCancel) {
+  constexpr int kIterations = 100;
+  constexpr double kMaxWait = 1e-3;
+
+  std::default_random_engine gen(42);
+  std::uniform_real_distribution<double> wait_dist(0.0, kMaxWait);
+
+  for (int i = 0; i < kIterations; ++i) {
+    Status result;
+
+    StopToken token;
+    auto barrier = Future<void>::Make();
+
+    const auto worker_func = [&]() {
+      ARROW_CHECK(barrier.Wait(kLongWait));
+      token.RequestStop(Status::IOError("Operation cancelled"));
+    };
+    std::thread thread(worker_func);
+
+    // Unblock thread
+    barrier.MarkFinished();
+    // Use a variable wait time to maximize potential synchronization issues
+    const auto wait_time = wait_dist(gen);
+    if (wait_time > kMaxWait * 0.5) {
+      SleepFor(wait_time);

Review comment:
       Hmm, no, the purpose is to instantiate the `StopCallback` at any possible time, including while the other thread is running `token.RequestStop(...)`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r539523917



##########
File path: cpp/src/arrow/util/cancel.h
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class StopToken;
+
+// A RAII wrapper that automatically registers and unregisters
+// a callback to a StopToken.
+class ARROW_MUST_USE_TYPE ARROW_EXPORT StopCallback {
+ public:
+  using Callable = std::function<void(const Status&)>;
+  StopCallback(StopToken* token, Callable cb);
+  ~StopCallback();
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(StopCallback);
+  StopCallback(StopCallback&&);
+  StopCallback& operator=(StopCallback&&);
+
+  void Call(const Status&);
+
+ protected:
+  StopToken* token_;
+  Callable cb_;
+};
+
+class ARROW_EXPORT StopToken {
+ public:
+  StopToken();
+  ~StopToken();
+  ARROW_DISALLOW_COPY_AND_ASSIGN(StopToken);
+
+  Status Poll();
+  bool IsStopRequested();
+
+  void RequestStop();
+  void RequestStop(std::string message);
+  void RequestStop(Status error);
+
+  // Register a callback that will be called whenever cancellation happens.
+  // Note the callback may be called immediately, if cancellation was already
+  // requested.  The callback will be unregistered when the returned object
+  // is destroyed.
+  StopCallback SetCallback(StopCallback::Callable cb);

Review comment:
       If the producer is sequencing several different operations with their own stop callbacks, this should make it easier than having to write a single overall callback that will dispatch to the right sub-callback.
   
   Or at least that's the thinking. We may want to revisit this later, once we start using this concretely :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r542448867



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -218,7 +220,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
 
   ThreadPool();
 
-  Status SpawnReal(TaskHints hints, std::function<void()> task) override;
+  Status SpawnReal(TaskHints hints, std::function<void()> task,
+                   StopToken* = NULLPTR) override;

Review comment:
       @lidavidm This PR, if accepted, will change the `Executor` interface slightly. You may want to take a look.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r543006149



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -218,7 +220,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
 
   ThreadPool();
 
-  Status SpawnReal(TaskHints hints, std::function<void()> task) override;
+  Status SpawnReal(TaskHints hints, std::function<void()> task,
+                   StopToken* = NULLPTR) override;

Review comment:
       Why expose the stop token to the executor at all?  If I'm writing a custom executor what do I do with the stop token?  I see that the thread pool uses this to skip the task if it has been cancelled prior to scheduling but conceivably this could be achieved by wrapping the task as well in the same way that the TaskGroup::AppendReal does.  Do you envision certain classes of custom executor will want to do something different with the stop token?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r543589561



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -218,7 +220,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
 
   ThreadPool();
 
-  Status SpawnReal(TaskHints hints, std::function<void()> task) override;
+  Status SpawnReal(TaskHints hints, std::function<void()> task,
+                   StopToken* = NULLPTR) override;

Review comment:
       If the producer is creating the stop token wouldn't the correct interface be
   
   Result<StopToken*> SpawnReal(TaskHints hints, std::function<void()> task);
   
   The executor is responsible for creating and returning a stop token.  For example, in the existing model, how does the executor inform the caller that it is, in fact, cancellable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] fsaintjacques commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
fsaintjacques commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r428612802



##########
File path: cpp/src/arrow/status.h
##########
@@ -83,6 +83,7 @@ enum class StatusCode : char {
   IOError = 5,
   CapacityError = 6,
   IndexError = 7,
+  Cancelled = 8,

Review comment:
       I think it's fine, think about one thread error-ing and the other receiving the cancel broadcast. It will be useful to distinguish which are real errors versus cancel-induced.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r543328875



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -218,7 +220,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
 
   ThreadPool();
 
-  Status SpawnReal(TaskHints hints, std::function<void()> task) override;
+  Status SpawnReal(TaskHints hints, std::function<void()> task,
+                   StopToken* = NULLPTR) override;

Review comment:
       Yes, the idea is that it gives more freedom to the executor like this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r425284648



##########
File path: cpp/src/arrow/status.h
##########
@@ -83,6 +83,7 @@ enum class StatusCode : char {
   IOError = 5,
   CapacityError = 6,
   IndexError = 7,
+  Cancelled = 8,

Review comment:
       I'm not sure this is useful. Cancellations have an underlying cause, so perhaps force the source of cancellation to pass the actual cause?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-781392458


   I rebased this on git master, but I will file a separate PR for the consumer-issued approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-789990634


   Closed, superseded by #9528.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-745486624


   > One possible limitation is that it doesn't seem easy for the consumer to know whether the producer will actually notice a cancellation request (some producers may be unable to?). Unless we expose a StopToken::Accept method that the producer calls to signal that it is able to react to cancellation requests.
   
   Correct, although I don't know of any cancellation framework that does support this.  I'm not sure what the caller is supposed to do with the information that cancel is not supported.
   
   Potentially one could imagine that the user's "cancel" button is disabled if an operation cannot be cancelled but it's hard to imagine a non-cancelable operation making it all the way up to the user layer.  For example, the moment a future is chained the resulting future will be cancellable (the second half of the chain might not run if cancelled at the first step).  Any intermediate layer that looks at the stop token to prevent follow-up work will hide the underlying non-cancellability.
   
   If a producer doesn't support cancel (this could be very common, for example, there is typically no way to cancel an ongoing blocking I/O operation) then calling cancel can still just be a no-op.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r427297417



##########
File path: cpp/src/arrow/util/cancel.cc
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <utility>
+
+#include "arrow/util/cancel.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+StopCallback::StopCallback(StopToken* token, Callable cb)
+    : token_(token), cb_(std::move(cb)) {
+  if (token_ != nullptr) {
+    DCHECK(cb_);
+    // May call *this
+    token_->SetCallback(this);
+  }
+}
+
+StopCallback::~StopCallback() {
+  if (token_ != nullptr) {
+    token_->RemoveCallback(this);
+  }
+}
+
+StopCallback::StopCallback(StopCallback&& other) { *this = std::move(other); }
+
+StopCallback& StopCallback::operator=(StopCallback&& other) {
+  token_ = other.token_;
+  if (token_ != nullptr) {
+    other.token_ = nullptr;
+    token_->RemoveCallback(&other);
+  }
+  cb_ = std::move(other.cb_);
+  if (token_ != nullptr) {
+    // May call *this
+    token_->SetCallback(this);
+  }
+  return *this;
+}
+
+void StopCallback::Call(const Status& st) {
+  if (cb_) {
+    // Forget callable after calling it
+    Callable local_cb;
+    cb_.swap(local_cb);
+    local_cb(st);
+  }
+}
+
+struct StopToken::Impl {
+  std::mutex mutex_;
+  StopCallback* cb_ = nullptr;
+  bool requested_ = false;
+  Status cancel_error_;
+};
+
+StopToken::StopToken() : impl_(new Impl()) {}
+
+StopToken::~StopToken() {}
+
+Status StopToken::Poll() {
+  std::lock_guard<std::mutex> lock(impl_->mutex_);

Review comment:
       We may be able to avoid the mutex lock here by using an atomic bool for `requested_`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-745031743


   So for discussion & comparison's sake, I will point out that this approach is a little different than C#'s "unified" approach [1] and is more in line with Java's approach (minus Java's added complexity around "interrupts").
   
   In the C# model the stop token is provided by the consumer which is a little counter-intuitive but it does have some advantages.  An intermediate layer's only responsibility is to receive the cancellation token and pass it down to all lower layer calls.  For example, consider some kind of intermediate parsing-aggregation layer.  It makes multiple concurrent calls to load and parse a column of data.  With the current approach it would have to keep track of several stop tokens, provide its own stop token (which it returns to the caller), listen to its stop token, and propagate any cancellation to all of the child stop tokens.
   
   This model also means that arrow-10182 will have to propagate cancellation up the callback chain.  In other words, when a child future is cancelled, it should cancel the parent future that caused the child future to be created.  This is the opposite direction that errors propagate.  I'm certain it's something that can be done, but it adds more complexity.  In the unified model all futures in the chain would be cancelled at once (since they share the same cancellation source) and there is no need to propagate.
   
   On the other hand, to play devil's advocate, the unified cancellation model can be confusing to the consumer.  As I mentioned earlier, it is counter-intuitive for the consumer to provide the cancellation token.  People are used to the idea of getting some kind of handle back from a long running operation that they cancel.
   
   [1] https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-745283125


   Thanks for the insight @westonpace . I agree the "unified" model looks attractive. Let me think about it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-628756628


   @fsaintjacques @bkietz Comments welcome on this draft.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-742000105


   @bkietz @westonpace I updated this draft PR and added integration with futures. I'm not sure about the overall API and control flow, feel free to discuss.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r544166587



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -218,7 +220,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
 
   ThreadPool();
 
-  Status SpawnReal(TaskHints hints, std::function<void()> task) override;
+  Status SpawnReal(TaskHints hints, std::function<void()> task,
+                   StopToken* = NULLPTR) override;

Review comment:
       Well, the stop token is created in `Executor::Submit` since it's embedded in the Future. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r539524089



##########
File path: cpp/src/arrow/util/cancel.h
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class StopToken;
+
+// A RAII wrapper that automatically registers and unregisters
+// a callback to a StopToken.
+class ARROW_MUST_USE_TYPE ARROW_EXPORT StopCallback {
+ public:
+  using Callable = std::function<void(const Status&)>;
+  StopCallback(StopToken* token, Callable cb);
+  ~StopCallback();
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(StopCallback);
+  StopCallback(StopCallback&&);
+  StopCallback& operator=(StopCallback&&);
+
+  void Call(const Status&);
+
+ protected:
+  StopToken* token_;
+  Callable cb_;
+};
+
+class ARROW_EXPORT StopToken {
+ public:
+  StopToken();
+  ~StopToken();
+  ARROW_DISALLOW_COPY_AND_ASSIGN(StopToken);
+
+  Status Poll();

Review comment:
       I should add docstrings in any case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#issuecomment-746019377


   I'll probably try to submit another PR with the "unified" model (i.e. consumer-issued stop token), though that may be after my end-of-year vacation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #7179:
URL: https://github.com/apache/arrow/pull/7179#discussion_r542465782



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -218,7 +220,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
 
   ThreadPool();
 
-  Status SpawnReal(TaskHints hints, std::function<void()> task) override;
+  Status SpawnReal(TaskHints hints, std::function<void()> task,
+                   StopToken* = NULLPTR) override;

Review comment:
       Thanks for the heads up. This looks good, we don't yet use the Executor interface but will move to it once we start looking at upstreaming/open-sourcing our project.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #7179: ARROW-8732: [C++] Add basic cancellation API

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #7179:
URL: https://github.com/apache/arrow/pull/7179


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org