You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/05/02 12:44:59 UTC

[arrow] branch master updated: ARROW-16417: [C++][Python] Segfault in test_exec_plan.py / test_joins

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7809c6d7cb ARROW-16417: [C++][Python] Segfault in test_exec_plan.py / test_joins
7809c6d7cb is described below

commit 7809c6d7cb9dcf327840b0f9db1bff436d381f29
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon May 2 08:44:27 2022 -0400

    ARROW-16417: [C++][Python] Segfault in test_exec_plan.py / test_joins
    
    This builds on top of #13035 which is also important for avoiding segmentation faults.  On top of that there were a few more problems:
    
     * The python was using `SourceNodeOptions::FromTable` which is a rather dangerous method (mainly useful for unit testing) as it doesn't share ownership of the input table (even worse, it takes a const ref).  Python was not keeping the table alive and it was maybe possible for the table to deleted out from under the plan (I'm not entirely sure this was causing issues but it seemed risky).  I switched to TableSourceNode which shares ownership of the table (and is a bit more efficient).
     * Setting use_threads to False did nothing because `_perform_join` was not passing the arg on to `execplan`.
     * When fixing the above and running with `use_threads=False` it was creating a single thread executor but the current best practice is to pass in nullptr.
     * Finally, the actual bug was my improper fix in #12894 .  I had still left a small window open for `End` to be called between `Submit` and `AddTask` which would allow the task to be submitted but not participate in setting `finished` on the node.
    
    Closes #13036 from westonpace/bugfix/ARROW-16417--segfault-in-python-join
    
    Lead-authored-by: Weston Pace <we...@gmail.com>
    Co-authored-by: David Li <li...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 cpp/src/arrow/compute/exec/hash_join_node.cc | 21 +++++++++++----------
 python/pyarrow/_exec_plan.pyx                | 22 +++++++++++-----------
 python/pyarrow/includes/libarrow.pxd         |  5 ++---
 3 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc
index d28e3aeda4..0282e387c4 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -634,16 +634,17 @@ class HashJoinNode : public ExecNode {
   Status ScheduleTaskCallback(std::function<Status(size_t)> func) {
     auto executor = plan_->exec_context()->executor();
     if (executor) {
-      ARROW_ASSIGN_OR_RAISE(auto task_fut, executor->Submit([this, func] {
-        size_t thread_index = thread_indexer_();
-        Status status = func(thread_index);
-        if (!status.ok()) {
-          StopProducing();
-          ErrorIfNotOk(status);
-          return;
-        }
-      }));
-      return task_group_.AddTask(task_fut);
+      return task_group_.AddTask([this, executor, func] {
+        return DeferNotOk(executor->Submit([this, func] {
+          size_t thread_index = thread_indexer_();
+          Status status = func(thread_index);
+          if (!status.ok()) {
+            StopProducing();
+            ErrorIfNotOk(status);
+            return;
+          }
+        }));
+      });
     } else {
       // We should not get here in serial execution mode
       ARROW_DCHECK(false);
diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx
index 909d12ed2c..7cbce9baa6 100644
--- a/python/pyarrow/_exec_plan.pyx
+++ b/python/pyarrow/_exec_plan.pyx
@@ -56,7 +56,6 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
     """
     cdef:
         CExecutor *c_executor
-        shared_ptr[CThreadPool] c_executor_sptr
         shared_ptr[CExecContext] c_exec_context
         shared_ptr[CExecPlan] c_exec_plan
         vector[CDeclaration] c_decls
@@ -64,8 +63,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
         vector[CExecNode*] c_final_node_vec
         CExecNode *c_node
         CTable* c_table
+        shared_ptr[CTable] c_in_table
         shared_ptr[CTable] c_out_table
-        shared_ptr[CSourceNodeOptions] c_sourceopts
+        shared_ptr[CTableSourceNodeOptions] c_tablesourceopts
         shared_ptr[CScanNodeOptions] c_scanopts
         shared_ptr[CExecNodeOptions] c_input_node_opts
         shared_ptr[CSinkNodeOptions] c_sinkopts
@@ -78,8 +78,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
     if use_threads:
         c_executor = GetCpuThreadPool()
     else:
-        c_executor_sptr = GetResultValue(CThreadPool.Make(1))
-        c_executor = c_executor_sptr.get()
+        c_executor = NULL
 
     c_exec_context = make_shared[CExecContext](
         c_default_memory_pool(), c_executor)
@@ -90,12 +89,12 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
     # Create source nodes for each input
     for ipt in inputs:
         if isinstance(ipt, Table):
-            node_factory = "source"
-            c_in_table = pyarrow_unwrap_table(ipt).get()
-            c_sourceopts = GetResultValue(
-                CSourceNodeOptions.FromTable(deref(c_in_table), deref(c_exec_context).executor()))
-            c_input_node_opts = static_pointer_cast[CExecNodeOptions, CSourceNodeOptions](
-                c_sourceopts)
+            node_factory = "table_source"
+            c_in_table = pyarrow_unwrap_table(ipt)
+            c_tablesourceopts = make_shared[CTableSourceNodeOptions](
+                c_in_table, 1 << 20)
+            c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
+                c_tablesourceopts)
         elif isinstance(ipt, Dataset):
             node_factory = "scan"
             c_in_dataset = (<Dataset>ipt).unwrap()
@@ -348,6 +347,7 @@ def _perform_join(join_type, left_operand not None, left_keys,
 
     result_table = execplan([left_operand, right_operand],
                             plan=c_decl_plan,
-                            output_type=output_type)
+                            output_type=output_type,
+                            use_threads=use_threads)
 
     return result_table
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index e34bc7a28f..cc52102ef8 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2447,9 +2447,8 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil
     cdef cppclass CExecNodeOptions "arrow::compute::ExecNodeOptions":
         pass
 
-    cdef cppclass CSourceNodeOptions "arrow::compute::SourceNodeOptions"(CExecNodeOptions):
-        @staticmethod
-        CResult[shared_ptr[CSourceNodeOptions]] FromTable(const CTable& table, CExecutor*)
+    cdef cppclass CTableSourceNodeOptions "arrow::compute::TableSourceNodeOptions"(CExecNodeOptions):
+        CTableSourceNodeOptions(shared_ptr[CTable] table, int64_t max_batch_size)
 
     cdef cppclass CSinkNodeOptions "arrow::compute::SinkNodeOptions"(CExecNodeOptions):
         pass