You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by qu...@apache.org on 2023/06/13 23:09:04 UTC
[arrow-julia] 01/01: Use wkspawn from ConcurrentUtilities instead of Threads.spawn
This is an automated email from the ASF dual-hosted git repository.
quinnj pushed a commit to branch jq-wkspawn
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
commit 61bee0fcd50fbe2ba5b097735dfd72aff36f76cb
Author: Jacob Quinn <qu...@gmail.com>
AuthorDate: Tue Jun 13 17:05:55 2023 -0600
Use wkspawn from ConcurrentUtilities instead of Threads.spawn
Should fix #336.
For more context, see the [same fix](https://github.com/JuliaData/CSV.jl/commit/077e177e359c0b58e2d6db0b660a235a2ee54228)
we made for this in CSV.jl.
Basically, objects interpolated into or returned from spawned tasks can
be unexpectedly kept alive long after the task has finished and the object
should have been garbage-collected due to individual threads holding
the most recent task as a reference. Using `@wkspawn` ensures tasks themselves
don't hold references to any of these once they're done executing.
---
src/append.jl | 4 ++--
src/table.jl | 4 ++--
src/write.jl | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/src/append.jl b/src/append.jl
index db7f1d3..a3c620f 100644
--- a/src/append.jl
+++ b/src/append.jl
@@ -130,7 +130,7 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions,
blocks = (Block[], Block[])
# start message writing from channel
threaded = ntasks > 1
- tsk = threaded ? (Threads.@spawn for msg in msgs
+ tsk = threaded ? (@wkspawn for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end) : (@async for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
@@ -151,7 +151,7 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions,
end
if threaded
- Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, sync, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
+ @wkspawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, sync, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
else
@async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, sync, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
end
diff --git a/src/table.jl b/src/table.jl
index e61445c..daa6566 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -358,7 +358,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field
sync = OrderedSynchronizer()
tsks = Channel{Any}(Inf)
- tsk = Threads.@spawn begin
+ tsk = @wkspawn begin
i = 1
for cols in tsks
if i == 1
@@ -425,7 +425,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
elseif header isa Meta.RecordBatch
anyrecordbatches = true
@debugv 1 "parsing record batch message: compression = $(header.compression)"
- Threads.@spawn begin
+ @wkspawn begin
cols = collect(VectorIterator(sch, $batch, dictencodings, convert))
put!(() -> put!(tsks, cols), sync, $(rbi))
end
diff --git a/src/write.jl b/src/write.jl
index a6bd40b..6f14e5e 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -147,7 +147,7 @@ function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,Symbol,LZ4Fram
blocks = (Block[], Block[])
# start message writing from channel
threaded = Threads.nthreads() > 1
- task = threaded ? (Threads.@spawn for msg in msgs
+ task = threaded ? (@wkspawn for msg in msgs
Base.write(io, msg, blocks, schema, alignment)
end) : (@async for msg in msgs
Base.write(io, msg, blocks, schema, alignment)
@@ -202,7 +202,7 @@ function write(writer::Writer, source)
put!(writer.msgs, recbatchmsg)
else
if writer.threaded
- Threads.@spawn process_partition(tblcols, writer.dictencodings, writer.largelists, writer.compress, writer.denseunions, writer.dictencode, writer.dictencodenested, writer.maxdepth, writer.sync, writer.msgs, writer.alignment, $(writer.partition_count), writer.schema, writer.errorref, writer.anyerror, writer.meta, writer.colmeta)
+ @wkspawn process_partition(tblcols, writer.dictencodings, writer.largelists, writer.compress, writer.denseunions, writer.dictencode, writer.dictencodenested, writer.maxdepth, writer.sync, writer.msgs, writer.alignment, $(writer.partition_count), writer.schema, writer.errorref, writer.anyerror, writer.meta, writer.colmeta)
else
@async process_partition(tblcols, writer.dictencodings, writer.largelists, writer.compress, writer.denseunions, writer.dictencode, writer.dictencodenested, writer.maxdepth, writer.sync, writer.msgs, writer.alignment, $(writer.partition_count), writer.schema, writer.errorref, writer.anyerror, writer.meta, writer.colmeta)
end