You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by qu...@apache.org on 2023/06/13 23:09:04 UTC

[arrow-julia] 01/01: Use wkspawn from ConcurrentUtilities instead of Threads.spawn

This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch jq-wkspawn
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git

commit 61bee0fcd50fbe2ba5b097735dfd72aff36f76cb
Author: Jacob Quinn <qu...@gmail.com>
AuthorDate: Tue Jun 13 17:05:55 2023 -0600

    Use wkspawn from ConcurrentUtilities instead of Threads.spawn
    
    Should fix #336.
    
    For more context, see the [same fix](https://github.com/JuliaData/CSV.jl/commit/077e177e359c0b58e2d6db0b660a235a2ee54228)
    we made for this in CSV.jl.
    
    Basically, objects interpolated into or returned from spawned tasks can
    be unexpectedly kept alive long after the task has finished and the object
    should have been garbage-collected due to individual threads holding
    the most recent task as a reference. Using `@wkspawn` ensures tasks themselves
    don't hold references to any of these once they're done executing.
---
 src/append.jl | 4 ++--
 src/table.jl  | 4 ++--
 src/write.jl  | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/append.jl b/src/append.jl
index db7f1d3..a3c620f 100644
--- a/src/append.jl
+++ b/src/append.jl
@@ -130,7 +130,7 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions,
     blocks = (Block[], Block[])
     # start message writing from channel
     threaded = ntasks > 1
-    tsk = threaded ? (Threads.@spawn for msg in msgs
+    tsk = threaded ? (@wkspawn for msg in msgs
         Base.write(io, msg, blocks, sch, alignment)
     end) : (@async for msg in msgs
         Base.write(io, msg, blocks, sch, alignment)
@@ -151,7 +151,7 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions,
         end
 
         if threaded
-            Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, sync, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
+            @wkspawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, sync, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
         else
             @async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, sync, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
         end
diff --git a/src/table.jl b/src/table.jl
index e61445c..daa6566 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -358,7 +358,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
     dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field
     sync = OrderedSynchronizer()
     tsks = Channel{Any}(Inf)
-    tsk = Threads.@spawn begin
+    tsk = @wkspawn begin
         i = 1
         for cols in tsks
             if i == 1
@@ -425,7 +425,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
             elseif header isa Meta.RecordBatch
                 anyrecordbatches = true
                 @debugv 1 "parsing record batch message: compression = $(header.compression)"
-                Threads.@spawn begin
+                @wkspawn begin
                     cols = collect(VectorIterator(sch, $batch, dictencodings, convert))
                     put!(() -> put!(tsks, cols), sync, $(rbi))
                 end
diff --git a/src/write.jl b/src/write.jl
index a6bd40b..6f14e5e 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -147,7 +147,7 @@ function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,Symbol,LZ4Fram
     blocks = (Block[], Block[])
     # start message writing from channel
     threaded = Threads.nthreads() > 1
-    task = threaded ? (Threads.@spawn for msg in msgs
+    task = threaded ? (@wkspawn for msg in msgs
         Base.write(io, msg, blocks, schema, alignment)
     end) : (@async for msg in msgs
         Base.write(io, msg, blocks, schema, alignment)
@@ -202,7 +202,7 @@ function write(writer::Writer, source)
             put!(writer.msgs, recbatchmsg)
         else
             if writer.threaded
-                Threads.@spawn process_partition(tblcols, writer.dictencodings, writer.largelists, writer.compress, writer.denseunions, writer.dictencode, writer.dictencodenested, writer.maxdepth, writer.sync, writer.msgs, writer.alignment, $(writer.partition_count), writer.schema, writer.errorref, writer.anyerror, writer.meta, writer.colmeta)
+                @wkspawn process_partition(tblcols, writer.dictencodings, writer.largelists, writer.compress, writer.denseunions, writer.dictencode, writer.dictencodenested, writer.maxdepth, writer.sync, writer.msgs, writer.alignment, $(writer.partition_count), writer.schema, writer.errorref, writer.anyerror, writer.meta, writer.colmeta)
             else
                 @async process_partition(tblcols, writer.dictencodings, writer.largelists, writer.compress, writer.denseunions, writer.dictencode, writer.dictencodenested, writer.maxdepth, writer.sync, writer.msgs, writer.alignment, $(writer.partition_count), writer.schema, writer.errorref, writer.anyerror, writer.meta, writer.colmeta)
             end