You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/11 01:16:38 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #14227: ARROW-17837: [C++][Acero] Create ExecPlan-owned QueryContext that will store a plan's shared data structures

westonpace commented on code in PR #14227:
URL: https://github.com/apache/arrow/pull/14227#discussion_r1019672803


##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/async_util.h"
+
+#pragma once
+
+namespace arrow {
+namespace internal {
+class CpuInfo;

Review Comment:
   This seems to be forward declared in a few places (not just from this PR).  Maybe we can add it to `util/type_fwd.h`?



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/async_util.h"
+
+#pragma once
+
+namespace arrow {
+namespace internal {
+class CpuInfo;
+}
+
+using io::IOContext;
+namespace compute {
+struct ARROW_EXPORT QueryOptions {
+  QueryOptions();
+  // 0 means unlimited
+  size_t max_memory_bytes;
+
+  /// \brief Should the plan use a legacy batching strategy
+  ///
+  /// This is currently in place only to support the Scanner::ToTable
+  /// method.  This method relies on batch indices from the scanner
+  /// remaining consistent.  This is impractical in the ExecPlan which
+  /// might slice batches as needed (e.g. for a join)
+  ///
+  /// However, it still works for simple plans and this is the only way
+  /// we have at the moment for maintaining implicit order.
+  bool use_legacy_batching;
+};
+
+class ARROW_EXPORT QueryContext {
+ public:
+  QueryContext(QueryOptions opts = {},
+               ExecContext exec_context = *default_exec_context());
+
+  Status Init(size_t max_num_threads);
+
+  const ::arrow::internal::CpuInfo* cpu_info() const;
+  int64_t hardware_flags() const { return cpu_info()->hardware_flags(); }
+  const QueryOptions& options() const { return options_; }
+  MemoryPool* memory_pool() const { return exec_context_.memory_pool(); }
+  ::arrow::internal::Executor* executor() const { return exec_context_.executor(); }
+  ExecContext* exec_context() { return &exec_context_; }
+  IOContext* io_context() { return &io_context_; }
+  TaskScheduler* scheduler() { return task_scheduler_.get(); }
+  util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_.get(); }
+
+  size_t GetThreadIndex();
+  size_t max_concurrency() const;
+  Result<util::TempVectorStack*> GetTempStack(size_t thread_index);

Review Comment:
   This feels a little odd that it would lead to code like:
   
   ```
   query_context->GetTempStack(query_context->GetThreadIndex());
   ```
   
   In other words...why would the query context need the `thread_index` if it is the one providing the thread index?  My current best guess is because we expect the call to `GetThreadIndex()` to be somewhat costly?



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/async_util.h"
+
+#pragma once
+
+namespace arrow {
+namespace internal {
+class CpuInfo;
+}
+
+using io::IOContext;
+namespace compute {
+struct ARROW_EXPORT QueryOptions {
+  QueryOptions();
+  // 0 means unlimited
+  size_t max_memory_bytes;

Review Comment:
   Also...we will need to be very clear how we document this field :laughing:
   
   Users don't often have a solid understanding of all the places RAM could be used (scratch buffer, incoming I/O, outgoing I/O, fragmentation, etc.) and if we just leave this as `max_memory_bytes` then people will complain as soon as RSS goes above this number.



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/async_util.h"
+
+#pragma once
+
+namespace arrow {
+namespace internal {
+class CpuInfo;
+}
+
+using io::IOContext;
+namespace compute {
+struct ARROW_EXPORT QueryOptions {
+  QueryOptions();
+  // 0 means unlimited
+  size_t max_memory_bytes;
+
+  /// \brief Should the plan use a legacy batching strategy
+  ///
+  /// This is currently in place only to support the Scanner::ToTable
+  /// method.  This method relies on batch indices from the scanner
+  /// remaining consistent.  This is impractical in the ExecPlan which
+  /// might slice batches as needed (e.g. for a join)
+  ///
+  /// However, it still works for simple plans and this is the only way
+  /// we have at the moment for maintaining implicit order.
+  bool use_legacy_batching;
+};
+
+class ARROW_EXPORT QueryContext {
+ public:
+  QueryContext(QueryOptions opts = {},
+               ExecContext exec_context = *default_exec_context());
+
+  Status Init(size_t max_num_threads);
+
+  const ::arrow::internal::CpuInfo* cpu_info() const;
+  int64_t hardware_flags() const { return cpu_info()->hardware_flags(); }

Review Comment:
   I'm pretty sure accessing `cpu_info()` in this way requires a fully defined type so maybe just get rid of the forward declaration above or move this method into a `.cc` file?



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/async_util.h"
+
+#pragma once
+
+namespace arrow {
+namespace internal {
+class CpuInfo;
+}
+
+using io::IOContext;
+namespace compute {
+struct ARROW_EXPORT QueryOptions {
+  QueryOptions();
+  // 0 means unlimited
+  size_t max_memory_bytes;
+
+  /// \brief Should the plan use a legacy batching strategy
+  ///
+  /// This is currently in place only to support the Scanner::ToTable
+  /// method.  This method relies on batch indices from the scanner
+  /// remaining consistent.  This is impractical in the ExecPlan which
+  /// might slice batches as needed (e.g. for a join)
+  ///
+  /// However, it still works for simple plans and this is the only way
+  /// we have at the moment for maintaining implicit order.
+  bool use_legacy_batching;
+};
+
+class ARROW_EXPORT QueryContext {
+ public:
+  QueryContext(QueryOptions opts = {},
+               ExecContext exec_context = *default_exec_context());
+
+  Status Init(size_t max_num_threads);
+
+  const ::arrow::internal::CpuInfo* cpu_info() const;
+  int64_t hardware_flags() const { return cpu_info()->hardware_flags(); }
+  const QueryOptions& options() const { return options_; }
+  MemoryPool* memory_pool() const { return exec_context_.memory_pool(); }
+  ::arrow::internal::Executor* executor() const { return exec_context_.executor(); }
+  ExecContext* exec_context() { return &exec_context_; }
+  IOContext* io_context() { return &io_context_; }
+  TaskScheduler* scheduler() { return task_scheduler_.get(); }
+  util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_.get(); }
+
+  size_t GetThreadIndex();
+  size_t max_concurrency() const;
+  Result<util::TempVectorStack*> GetTempStack(size_t thread_index);
+
+  /// \brief Start an external task
+  ///
+  /// This should be avoided if possible.  It is kept in for now for legacy
+  /// purposes.  This should be called before the external task is started.  If
+  /// a valid future is returned then it should be marked complete when the
+  /// external task has finished.
+  ///
+  /// \return an invalid future if the plan has already ended, otherwise this
+  ///         returns a future that must be completed when the external task
+  ///         finishes.
+  Result<Future<>> BeginExternalTask();
+
+  /// \brief Add a single function as a task to the query's task group
+  ///        on the compute threadpool.
+  ///
+  /// \param fn The task to run. Takes no arguments and returns a Status.
+  Status ScheduleTask(std::function<Status()> fn);
+  /// \brief Add a single function as a task to the query's task group
+  ///        on the compute threadpool.
+  ///
+  /// \param fn The task to run. Takes the thread index and returns a Status.
+  Status ScheduleTask(std::function<Status(size_t)> fn);
+  /// \brief Add a single function as a task to the query's task group on
+  ///        the IO thread pool
+  ///
+  /// \param fn The task to run. Returns a status.
+  Status ScheduleIOTask(std::function<Status()> fn);
+
+  // Register/Start TaskGroup is a way of performing a "Parallel For" pattern:
+  // - The task function takes the thread index and the index of the task
+  // - The on_finished function takes the thread index
+  // Returns an integer ID that will be used to reference the task group in
+  // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the number of times
+  // you'd like the task to be executed. The need to register a task group before use will
+  // be removed after we rewrite the scheduler.
+  /// \brief Register a "parallel for" task group with the scheduler
+  ///
+  /// \param task The function implementing the task. Takes the thread_index and
+  ///             the task index.
+  /// \param on_finished The function that gets run once all tasks have been completed.
+  /// Takes the thread_index.
+  ///
+  /// Must be called inside of ExecNode::Init.
+  int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task,
+                        std::function<Status(size_t)> on_finished);
+
+  /// \brief Start the task group with the specified ID. This can only
+  ///        be called once per task_group_id.
+  ///
+  /// \param task_group_id The ID  of the task group to run
+  /// \param num_tasks The number of times to run the task
+  Status StartTaskGroup(int task_group_id, int64_t num_tasks);
+
+  // This is an RAII class for keeping track of in-flight file IO. Useful for getting
+  // an estimate of memory use, and how much memory we expect to be freed soon.
+  // Returned by ReportTempFileIO.
+  struct [[nodiscard]] TempFileIOMark {
+    QueryContext* ctx_;
+    size_t bytes_;
+
+    TempFileIOMark(QueryContext* ctx, size_t bytes) : ctx_(ctx), bytes_(bytes) {
+      ctx_->in_flight_bytes_to_disk_.fetch_add(bytes_, std::memory_order_acquire);
+    }
+
+    ~TempFileIOMark() {
+      ctx_->in_flight_bytes_to_disk_.fetch_sub(bytes_, std::memory_order_release);
+    }
+  };
+
+  TempFileIOMark ReportTempFileIO(size_t bytes) { return {this, bytes}; }
+
+  size_t GetCurrentTempFileIO() { return in_flight_bytes_to_disk_.load(); }
+
+ private:
+  QueryOptions options_;
+  // To be replaced with Acero-specific context once scheduler is done and
+  // we don't need ExecContext for kernels
+  ExecContext exec_context_;
+  IOContext io_context_;
+
+  std::unique_ptr<util::AsyncTaskScheduler> async_scheduler_ =
+      util::AsyncTaskScheduler::Make();

Review Comment:
   This will need rebase as there is no longer a unique_ptr to an async task scheduler (it is owned by itself and destroyed when all tasks have finished).



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/async_util.h"
+
+#pragma once
+
+namespace arrow {
+namespace internal {
+class CpuInfo;
+}
+
+using io::IOContext;
+namespace compute {
+struct ARROW_EXPORT QueryOptions {
+  QueryOptions();
+  // 0 means unlimited
+  size_t max_memory_bytes;
+
+  /// \brief Should the plan use a legacy batching strategy
+  ///
+  /// This is currently in place only to support the Scanner::ToTable
+  /// method.  This method relies on batch indices from the scanner
+  /// remaining consistent.  This is impractical in the ExecPlan which
+  /// might slice batches as needed (e.g. for a join)
+  ///
+  /// However, it still works for simple plans and this is the only way
+  /// we have at the moment for maintaining implicit order.
+  bool use_legacy_batching;
+};
+
+class ARROW_EXPORT QueryContext {

Review Comment:
   This won't be "user facing" but it will be pretty relevant to anyone working on exec nodes.  We might want some docstrings explaining what these various fields are.  This could be deferred to a more general purpose "create documentation for custom node authors" PR though.



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/async_util.h"
+
+#pragma once
+
+namespace arrow {
+namespace internal {
+class CpuInfo;
+}
+
+using io::IOContext;
+namespace compute {
+struct ARROW_EXPORT QueryOptions {
+  QueryOptions();
+  // 0 means unlimited
+  size_t max_memory_bytes;

Review Comment:
   I agree we will want this but let's remove it until we're ready to support it in some way (sorry, I guess you probably moved it here from the hash spilling PR and now it will have to move back :grimacing: )



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/async_util.h"
+
+#pragma once
+
+namespace arrow {
+namespace internal {
+class CpuInfo;
+}
+
+using io::IOContext;
+namespace compute {
+struct ARROW_EXPORT QueryOptions {
+  QueryOptions();
+  // 0 means unlimited
+  size_t max_memory_bytes;
+
+  /// \brief Should the plan use a legacy batching strategy
+  ///
+  /// This is currently in place only to support the Scanner::ToTable
+  /// method.  This method relies on batch indices from the scanner
+  /// remaining consistent.  This is impractical in the ExecPlan which
+  /// might slice batches as needed (e.g. for a join)
+  ///
+  /// However, it still works for simple plans and this is the only way
+  /// we have at the moment for maintaining implicit order.
+  bool use_legacy_batching;
+};
+
+class ARROW_EXPORT QueryContext {
+ public:
+  QueryContext(QueryOptions opts = {},
+               ExecContext exec_context = *default_exec_context());
+
+  Status Init(size_t max_num_threads);
+
+  const ::arrow::internal::CpuInfo* cpu_info() const;
+  int64_t hardware_flags() const { return cpu_info()->hardware_flags(); }
+  const QueryOptions& options() const { return options_; }
+  MemoryPool* memory_pool() const { return exec_context_.memory_pool(); }
+  ::arrow::internal::Executor* executor() const { return exec_context_.executor(); }
+  ExecContext* exec_context() { return &exec_context_; }
+  IOContext* io_context() { return &io_context_; }
+  TaskScheduler* scheduler() { return task_scheduler_.get(); }
+  util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_.get(); }
+
+  size_t GetThreadIndex();
+  size_t max_concurrency() const;
+  Result<util::TempVectorStack*> GetTempStack(size_t thread_index);
+
+  /// \brief Start an external task
+  ///
+  /// This should be avoided if possible.  It is kept in for now for legacy
+  /// purposes.  This should be called before the external task is started.  If
+  /// a valid future is returned then it should be marked complete when the
+  /// external task has finished.
+  ///
+  /// \return an invalid future if the plan has already ended, otherwise this
+  ///         returns a future that must be completed when the external task
+  ///         finishes.
+  Result<Future<>> BeginExternalTask();
+
+  /// \brief Add a single function as a task to the query's task group
+  ///        on the compute threadpool.
+  ///
+  /// \param fn The task to run. Takes no arguments and returns a Status.
+  Status ScheduleTask(std::function<Status()> fn);
+  /// \brief Add a single function as a task to the query's task group
+  ///        on the compute threadpool.
+  ///
+  /// \param fn The task to run. Takes the thread index and returns a Status.
+  Status ScheduleTask(std::function<Status(size_t)> fn);
+  /// \brief Add a single function as a task to the query's task group on
+  ///        the IO thread pool
+  ///
+  /// \param fn The task to run. Returns a status.
+  Status ScheduleIOTask(std::function<Status()> fn);
+
+  // Register/Start TaskGroup is a way of performing a "Parallel For" pattern:
+  // - The task function takes the thread index and the index of the task
+  // - The on_finished function takes the thread index
+  // Returns an integer ID that will be used to reference the task group in
+  // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the number of times
+  // you'd like the task to be executed. The need to register a task group before use will
+  // be removed after we rewrite the scheduler.
+  /// \brief Register a "parallel for" task group with the scheduler
+  ///
+  /// \param task The function implementing the task. Takes the thread_index and
+  ///             the task index.
+  /// \param on_finished The function that gets run once all tasks have been completed.
+  /// Takes the thread_index.
+  ///
+  /// Must be called inside of ExecNode::Init.
+  int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task,
+                        std::function<Status(size_t)> on_finished);
+
+  /// \brief Start the task group with the specified ID. This can only
+  ///        be called once per task_group_id.
+  ///
+  /// \param task_group_id The ID  of the task group to run
+  /// \param num_tasks The number of times to run the task
+  Status StartTaskGroup(int task_group_id, int64_t num_tasks);
+
+  // This is an RAII class for keeping track of in-flight file IO. Useful for getting
+  // an estimate of memory use, and how much memory we expect to be freed soon.
+  // Returned by ReportTempFileIO.
+  struct [[nodiscard]] TempFileIOMark {

Review Comment:
   Add `ARROW_DISALLOW_COPY_AND_ASSIGN(TempFileIOMark);`?



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/util/async_util.h"
+
+#pragma once
+
+namespace arrow {
+namespace internal {
+class CpuInfo;
+}
+
+using io::IOContext;
+namespace compute {
+struct ARROW_EXPORT QueryOptions {
+  QueryOptions();
+  // 0 means unlimited
+  size_t max_memory_bytes;
+
+  /// \brief Should the plan use a legacy batching strategy
+  ///
+  /// This is currently in place only to support the Scanner::ToTable
+  /// method.  This method relies on batch indices from the scanner
+  /// remaining consistent.  This is impractical in the ExecPlan which
+  /// might slice batches as needed (e.g. for a join)
+  ///
+  /// However, it still works for simple plans and this is the only way
+  /// we have at the moment for maintaining implicit order.
+  bool use_legacy_batching;
+};
+
+class ARROW_EXPORT QueryContext {
+ public:
+  QueryContext(QueryOptions opts = {},
+               ExecContext exec_context = *default_exec_context());
+
+  Status Init(size_t max_num_threads);
+
+  const ::arrow::internal::CpuInfo* cpu_info() const;
+  int64_t hardware_flags() const { return cpu_info()->hardware_flags(); }
+  const QueryOptions& options() const { return options_; }
+  MemoryPool* memory_pool() const { return exec_context_.memory_pool(); }
+  ::arrow::internal::Executor* executor() const { return exec_context_.executor(); }
+  ExecContext* exec_context() { return &exec_context_; }
+  IOContext* io_context() { return &io_context_; }
+  TaskScheduler* scheduler() { return task_scheduler_.get(); }
+  util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_.get(); }
+
+  size_t GetThreadIndex();
+  size_t max_concurrency() const;
+  Result<util::TempVectorStack*> GetTempStack(size_t thread_index);
+
+  /// \brief Start an external task
+  ///
+  /// This should be avoided if possible.  It is kept in for now for legacy
+  /// purposes.  This should be called before the external task is started.  If
+  /// a valid future is returned then it should be marked complete when the
+  /// external task has finished.
+  ///
+  /// \return an invalid future if the plan has already ended, otherwise this
+  ///         returns a future that must be completed when the external task
+  ///         finishes.
+  Result<Future<>> BeginExternalTask();
+
+  /// \brief Add a single function as a task to the query's task group
+  ///        on the compute threadpool.
+  ///
+  /// \param fn The task to run. Takes no arguments and returns a Status.
+  Status ScheduleTask(std::function<Status()> fn);
+  /// \brief Add a single function as a task to the query's task group
+  ///        on the compute threadpool.
+  ///
+  /// \param fn The task to run. Takes the thread index and returns a Status.
+  Status ScheduleTask(std::function<Status(size_t)> fn);
+  /// \brief Add a single function as a task to the query's task group on
+  ///        the IO thread pool
+  ///
+  /// \param fn The task to run. Returns a status.
+  Status ScheduleIOTask(std::function<Status()> fn);
+
+  // Register/Start TaskGroup is a way of performing a "Parallel For" pattern:
+  // - The task function takes the thread index and the index of the task
+  // - The on_finished function takes the thread index
+  // Returns an integer ID that will be used to reference the task group in
+  // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the number of times
+  // you'd like the task to be executed. The need to register a task group before use will
+  // be removed after we rewrite the scheduler.
+  /// \brief Register a "parallel for" task group with the scheduler
+  ///
+  /// \param task The function implementing the task. Takes the thread_index and
+  ///             the task index.
+  /// \param on_finished The function that gets run once all tasks have been completed.
+  /// Takes the thread_index.
+  ///
+  /// Must be called inside of ExecNode::Init.
+  int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task,
+                        std::function<Status(size_t)> on_finished);
+
+  /// \brief Start the task group with the specified ID. This can only
+  ///        be called once per task_group_id.
+  ///
+  /// \param task_group_id The ID  of the task group to run
+  /// \param num_tasks The number of times to run the task
+  Status StartTaskGroup(int task_group_id, int64_t num_tasks);
+
+  // This is an RAII class for keeping track of in-flight file IO. Useful for getting
+  // an estimate of memory use, and how much memory we expect to be freed soon.
+  // Returned by ReportTempFileIO.
+  struct [[nodiscard]] TempFileIOMark {
+    QueryContext* ctx_;
+    size_t bytes_;
+
+    TempFileIOMark(QueryContext* ctx, size_t bytes) : ctx_(ctx), bytes_(bytes) {
+      ctx_->in_flight_bytes_to_disk_.fetch_add(bytes_, std::memory_order_acquire);
+    }
+
+    ~TempFileIOMark() {
+      ctx_->in_flight_bytes_to_disk_.fetch_sub(bytes_, std::memory_order_release);
+    }

Review Comment:
   I could very well be wrong but, I think, if you're simplly using `in_flight_bytes_to_disk_` as a counter, then you can do relaxed memory ordering.



##########
cpp/src/arrow/compute/exec/query_context.cc:
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/query_context.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/io_util.h"
+
+namespace arrow {
+using internal::CpuInfo;
+namespace compute {
+QueryOptions::QueryOptions()
+    : max_memory_bytes(
+          static_cast<size_t>(0.75f * ::arrow::internal::GetTotalMemoryBytes())),
+      use_legacy_batching(false) {}
+
+QueryContext::QueryContext(QueryOptions opts, ExecContext exec_context)
+    : options_(opts),
+      exec_context_(exec_context),
+      io_context_(exec_context_.memory_pool()) {}
+
+const CpuInfo* QueryContext::cpu_info() const { return CpuInfo::GetInstance(); }
+
+Status QueryContext::Init(size_t max_num_threads) {
+  tld_.resize(max_num_threads);
+  return Status::OK();
+}
+
+size_t QueryContext::GetThreadIndex() { return thread_indexer_(); }
+
+size_t QueryContext::max_concurrency() const { return thread_indexer_.Capacity(); }

Review Comment:
   Is `thread_indexer_` based on the `exec_context.executor()` somehow?  I kind of expected this to be `exec_context.executor()->Capacity()`



##########
cpp/src/arrow/dataset/scanner.cc:
##########
@@ -408,8 +408,11 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
   auto exec_context =
       std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
 
-  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
-  plan->SetUseLegacyBatching(use_legacy_batching);
+  compute::QueryOptions query_options;
+  query_options.use_legacy_batching = true;

Review Comment:
   ```suggestion
     query_options.use_legacy_batching = use_legacy_batching;
   ```



##########
cpp/src/arrow/compute/exec/query_context.cc:
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/query_context.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/io_util.h"
+
+namespace arrow {
+using internal::CpuInfo;
+namespace compute {
+QueryOptions::QueryOptions()
+    : max_memory_bytes(
+          static_cast<size_t>(0.75f * ::arrow::internal::GetTotalMemoryBytes())),
+      use_legacy_batching(false) {}
+
+QueryContext::QueryContext(QueryOptions opts, ExecContext exec_context)
+    : options_(opts),
+      exec_context_(exec_context),
+      io_context_(exec_context_.memory_pool()) {}

Review Comment:
   This means we are always using the default I/O thread pool...which is probably ok for now.  I'm not sure who is supposed to be providing the I/O context anyways (e.g. filesystem?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org