You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by qu...@apache.org on 2023/06/14 12:16:54 UTC

[arrow-julia] branch main updated: Use wkspawn from ConcurrentUtilities instead of Threads.spawn (#469)

This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git


The following commit(s) were added to refs/heads/main by this push:
     new f8d2203  Use wkspawn from ConcurrentUtilities instead of Threads.spawn (#469)
f8d2203 is described below

commit f8d2203b07380e1423723b5bfe32356aa1239284
Author: Jacob Quinn <qu...@gmail.com>
AuthorDate: Wed Jun 14 06:16:49 2023 -0600

    Use wkspawn from ConcurrentUtilities instead of Threads.spawn (#469)
    
    Should fix #336.
    
    For more context, see the [same
    fix](https://github.com/JuliaData/CSV.jl/commit/077e177e359c0b58e2d6db0b660a235a2ee54228)
    we made for this in CSV.jl.
    
    Basically, objects interpolated into or returned from spawned tasks can
    be unexpectedly kept alive long after the task has finished and the
    object should have been garbage-collected due to individual threads
    holding the most recent task as a reference. Using `@wkspawn` ensures
    tasks themselves don't hold references to any of these once they're done
    executing.
---
 src/append.jl | 4 ++--
 src/table.jl  | 4 ++--
 src/write.jl  | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/append.jl b/src/append.jl
index c0c663c..5a9c259 100644
--- a/src/append.jl
+++ b/src/append.jl
@@ -169,7 +169,7 @@ function append(
     # start message writing from channel
     threaded = ntasks > 1
     tsk =
-        threaded ? (Threads.@spawn for msg in msgs
+        threaded ? (@wkspawn for msg in msgs
             Base.write(io, msg, blocks, sch, alignment)
         end) : (@async for msg in msgs
             Base.write(io, msg, blocks, sch, alignment)
@@ -191,7 +191,7 @@ function append(
         end
 
         if threaded
-            Threads.@spawn process_partition(
+            @wkspawn process_partition(
                 tbl_cols,
                 dictencodings,
                 largelists,
diff --git a/src/table.jl b/src/table.jl
index bbb3825..882a99b 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -419,7 +419,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
     dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
     sync = OrderedSynchronizer()
     tsks = Channel{Any}(Inf)
-    tsk = Threads.@spawn begin
+    tsk = @wkspawn begin
         i = 1
         for cols in tsks
             if i == 1
@@ -522,7 +522,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
             elseif header isa Meta.RecordBatch
                 anyrecordbatches = true
                 @debugv 1 "parsing record batch message: compression = $(header.compression)"
-                Threads.@spawn begin
+                @wkspawn begin
                     cols = collect(VectorIterator(sch, $batch, dictencodings, convert))
                     put!(() -> put!(tsks, cols), sync, $(rbi))
                 end
diff --git a/src/write.jl b/src/write.jl
index 1376ee3..3db8dc9 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -167,7 +167,7 @@ function Base.open(
     # start message writing from channel
     threaded = Threads.nthreads() > 1
     task =
-        threaded ? (Threads.@spawn for msg in msgs
+        threaded ? (@wkspawn for msg in msgs
             Base.write(io, msg, blocks, schema, alignment)
         end) : (@async for msg in msgs
             Base.write(io, msg, blocks, schema, alignment)
@@ -296,7 +296,7 @@ function write(writer::Writer, source)
             put!(writer.msgs, recbatchmsg)
         else
             if writer.threaded
-                Threads.@spawn process_partition(
+                @wkspawn process_partition(
                     tblcols,
                     writer.dictencodings,
                     writer.largelists,