You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by qu...@apache.org on 2023/06/14 12:16:54 UTC
[arrow-julia] branch main updated: Use wkspawn from ConcurrentUtilities instead of Threads.spawn (#469)
This is an automated email from the ASF dual-hosted git repository.
quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
The following commit(s) were added to refs/heads/main by this push:
new f8d2203 Use wkspawn from ConcurrentUtilities instead of Threads.spawn (#469)
f8d2203 is described below
commit f8d2203b07380e1423723b5bfe32356aa1239284
Author: Jacob Quinn <qu...@gmail.com>
AuthorDate: Wed Jun 14 06:16:49 2023 -0600
Use wkspawn from ConcurrentUtilities instead of Threads.spawn (#469)
Should fix #336.
For more context, see the [same
fix](https://github.com/JuliaData/CSV.jl/commit/077e177e359c0b58e2d6db0b660a235a2ee54228)
we made for this in CSV.jl.
Basically, objects interpolated into or returned from spawned tasks can
be unexpectedly kept alive long after the task has finished and the
object should have been garbage-collected due to individual threads
holding the most recent task as a reference. Using `@wkspawn` ensures
tasks themselves don't hold references to any of these once they're done
executing.
---
src/append.jl | 4 ++--
src/table.jl | 4 ++--
src/write.jl | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/src/append.jl b/src/append.jl
index c0c663c..5a9c259 100644
--- a/src/append.jl
+++ b/src/append.jl
@@ -169,7 +169,7 @@ function append(
# start message writing from channel
threaded = ntasks > 1
tsk =
- threaded ? (Threads.@spawn for msg in msgs
+ threaded ? (@wkspawn for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end) : (@async for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
@@ -191,7 +191,7 @@ function append(
end
if threaded
- Threads.@spawn process_partition(
+ @wkspawn process_partition(
tbl_cols,
dictencodings,
largelists,
diff --git a/src/table.jl b/src/table.jl
index bbb3825..882a99b 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -419,7 +419,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
sync = OrderedSynchronizer()
tsks = Channel{Any}(Inf)
- tsk = Threads.@spawn begin
+ tsk = @wkspawn begin
i = 1
for cols in tsks
if i == 1
@@ -522,7 +522,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
elseif header isa Meta.RecordBatch
anyrecordbatches = true
@debugv 1 "parsing record batch message: compression = $(header.compression)"
- Threads.@spawn begin
+ @wkspawn begin
cols = collect(VectorIterator(sch, $batch, dictencodings, convert))
put!(() -> put!(tsks, cols), sync, $(rbi))
end
diff --git a/src/write.jl b/src/write.jl
index 1376ee3..3db8dc9 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -167,7 +167,7 @@ function Base.open(
# start message writing from channel
threaded = Threads.nthreads() > 1
task =
- threaded ? (Threads.@spawn for msg in msgs
+ threaded ? (@wkspawn for msg in msgs
Base.write(io, msg, blocks, schema, alignment)
end) : (@async for msg in msgs
Base.write(io, msg, blocks, schema, alignment)
@@ -296,7 +296,7 @@ function write(writer::Writer, source)
put!(writer.msgs, recbatchmsg)
else
if writer.threaded
- Threads.@spawn process_partition(
+ @wkspawn process_partition(
tblcols,
writer.dictencodings,
writer.largelists,