You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by qu...@apache.org on 2023/05/30 18:52:38 UTC
[arrow-julia] branch main updated: Refactor compressors/decompressors for laziness + safety (#445)
This is an automated email from the ASF dual-hosted git repository.
quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
The following commit(s) were added to refs/heads/main by this push:
new 6fe4ec0 Refactor compressors/decompressors for laziness + safety (#445)
6fe4ec0 is described below
commit 6fe4ec0058657a5fe9a18b226a54a03a56d5bf81
Author: Jacob Quinn <qu...@gmail.com>
AuthorDate: Tue May 30 12:52:33 2023 -0600
Refactor compressors/decompressors for laziness + safety (#445)
Fixes #396.
As noted in the originally reported issue, enabling debug logging when
writing arrow data with compression can result in segfaults because the
underlying CodecX package have debug logs, causing task
switches/migration and thus making the pattern of using a single
`X_COMPRESSOR` array indexed by `Threads.threadid()` unsafe since
multiple threads may try using the same compressor at the same time.
We fix this by wrapping each compressor in a `Lockable` and ensuring the
`compress` (or `uncompress`) operation holds the lock for the duration
of the operation. We also:
* Add a decompressor per thread to avoid recreating them over and over
during reading
* Lazily initialize compressors/decompressors in a way that is 1.9+ safe
and only creates the object when needed by a specific thread
* Switch from WorkerUtilities -> ConcurrentUtilities (the package was
renamed)
Successor to https://github.com/apache/arrow-julia/pull/397; I've added
@svilupp as a co-author here since they started the original movement
for the code to go in this direction.
---------
Co-authored-by: J S <49...@users.noreply.github.com>
---
Project.toml | 4 +--
src/Arrow.jl | 65 +++++++++++++++++++++++++++++++++++++-------
src/append.jl | 8 +-----
src/arraytypes/arraytypes.jl | 16 +++++++----
src/table.jl | 10 +++++--
src/write.jl | 20 ++++----------
6 files changed, 83 insertions(+), 40 deletions(-)
diff --git a/Project.toml b/Project.toml
index 5f75563..ed76b2f 100644
--- a/Project.toml
+++ b/Project.toml
@@ -24,6 +24,7 @@ ArrowTypes = "31f734f8-188a-4ce0-8406-c8a06bd891cd"
BitIntegers = "c3b6d118-76ef-56ca-8cc7-ebb389d030a1"
CodecLz4 = "5ba52731-8f18-5e0d-9241-30f10d1ec561"
CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2"
+ConcurrentUtilities = "f0e56b4a-5159-44fe-b623-3e5288b988bb"
DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
EnumX = "4e289a0a-7415-4d19-859d-a7e5c4648b56"
@@ -35,13 +36,13 @@ Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53"
TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
-WorkerUtilities = "76eceee3-57b5-4d4a-8e66-0e911cebbf60"
[compat]
ArrowTypes = "1.1,2"
BitIntegers = "0.2, 0.3"
CodecLz4 = "0.4"
CodecZstd = "0.7"
+ConcurrentUtilities = "2"
DataAPI = "1"
EnumX = "1"
FilePathsBase = "0.9"
@@ -51,7 +52,6 @@ SentinelArrays = "1"
Tables = "1.1"
TimeZones = "1"
TranscodingStreams = "0.9.12"
-WorkerUtilities = "1.1"
julia = "1.6"
[extras]
diff --git a/src/Arrow.jl b/src/Arrow.jl
index 01c7993..9d5a7fb 100644
--- a/src/Arrow.jl
+++ b/src/Arrow.jl
@@ -45,7 +45,7 @@ using Base.Iterators
using Mmap
using LoggingExtras
import Dates
-using DataAPI, Tables, SentinelArrays, PooledArrays, CodecLz4, CodecZstd, TimeZones, BitIntegers, WorkerUtilities
+using DataAPI, Tables, SentinelArrays, PooledArrays, CodecLz4, CodecZstd, TimeZones, BitIntegers, ConcurrentUtilities
export ArrowTypes
@@ -71,18 +71,63 @@ include("write.jl")
include("append.jl")
include("show.jl")
-const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[]
-const ZSTD_COMPRESSOR = ZstdCompressor[]
+const ZSTD_COMPRESSOR = Lockable{ZstdCompressor}[]
+const ZSTD_DECOMPRESSOR = Lockable{ZstdDecompressor}[]
+const LZ4_FRAME_COMPRESSOR = Lockable{LZ4FrameCompressor}[]
+const LZ4_FRAME_DECOMPRESSOR = Lockable{LZ4FrameDecompressor}[]
+
+function init_zstd_compressor()
+ zstd = ZstdCompressor(; level=3)
+ CodecZstd.TranscodingStreams.initialize(zstd)
+ return Lockable(zstd)
+end
+
+function init_zstd_decompressor()
+ zstd = ZstdDecompressor()
+ CodecZstd.TranscodingStreams.initialize(zstd)
+ return Lockable(zstd)
+end
+
+function init_lz4_frame_compressor()
+ lz4 = LZ4FrameCompressor(; compressionlevel=4)
+ CodecLz4.TranscodingStreams.initialize(lz4)
+ return Lockable(lz4)
+end
+
+function init_lz4_frame_decompressor()
+ lz4 = LZ4FrameDecompressor()
+ CodecLz4.TranscodingStreams.initialize(lz4)
+ return Lockable(lz4)
+end
+
+function access_threaded(f, v::Vector)
+ tid = Threads.threadid()
+ 0 < tid <= length(v) || _length_assert()
+ if @inbounds isassigned(v, tid)
+ @inbounds x = v[tid]
+ else
+ x = f()
+ @inbounds v[tid] = x
+ end
+ return x
+end
+@noinline _length_assert() = @assert false "0 < tid <= v"
+
+zstd_compressor() = access_threaded(init_zstd_compressor, ZSTD_COMPRESSOR)
+zstd_decompressor() = access_threaded(init_zstd_decompressor, ZSTD_DECOMPRESSOR)
+lz4_frame_compressor() = access_threaded(init_lz4_frame_compressor, LZ4_FRAME_COMPRESSOR)
+lz4_frame_decompressor() = access_threaded(init_lz4_frame_decompressor, LZ4_FRAME_DECOMPRESSOR)
function __init__()
- for _ = 1:Threads.nthreads()
- zstd = ZstdCompressor(; level=3)
- CodecZstd.TranscodingStreams.initialize(zstd)
- push!(ZSTD_COMPRESSOR, zstd)
- lz4 = LZ4FrameCompressor(; compressionlevel=4)
- CodecLz4.TranscodingStreams.initialize(lz4)
- push!(LZ4_FRAME_COMPRESSOR, lz4)
+ nt = @static if isdefined(Base.Threads, :maxthreadid)
+ Threads.maxthreadid()
+ else
+ Threads.nthreads()
end
+ resize!(empty!(LZ4_FRAME_COMPRESSOR), nt)
+ resize!(empty!(ZSTD_COMPRESSOR), nt)
+ resize!(empty!(LZ4_FRAME_DECOMPRESSOR), nt)
+ resize!(empty!(ZSTD_DECOMPRESSOR), nt)
return
end
diff --git a/src/append.jl b/src/append.jl
index 4bd45b3..fdcd5e5 100644
--- a/src/append.jl
+++ b/src/append.jl
@@ -109,15 +109,9 @@ function append(io::IO, tbl;
if !isstream
throw(ArgumentError("append is supported only to files in arrow stream format"))
end
-
- if compress === :lz4
- compress = LZ4_FRAME_COMPRESSOR
- elseif compress === :zstd
- compress = ZSTD_COMPRESSOR
- elseif compress isa Symbol
+ if compress isa Symbol && compress !== :lz4 && compress !== :zstd
throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
end
-
append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata)
end
diff --git a/src/arraytypes/arraytypes.jl b/src/arraytypes/arraytypes.jl
index 3bbfd0e..a3449f1 100644
--- a/src/arraytypes/arraytypes.jl
+++ b/src/arraytypes/arraytypes.jl
@@ -31,18 +31,24 @@ nullcount(x::ArrowVector) = validitybitmap(x).nc
getmetadata(x::ArrowVector) = x.metadata
Base.deleteat!(x::T, inds) where {T <: ArrowVector} = throw(ArgumentError("`$T` does not support `deleteat!`; arrow data is by nature immutable"))
-function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, Vector{LZ4FrameCompressor}, LZ4FrameCompressor, Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
+function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, kw...)
@debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(values(kw))"
@debugv 3 x
A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...)
if compression isa LZ4FrameCompressor
A = compress(Meta.CompressionType.LZ4_FRAME, compression, A)
- elseif compression isa Vector{LZ4FrameCompressor}
- A = compress(Meta.CompressionType.LZ4_FRAME, compression[Threads.threadid()], A)
elseif compression isa ZstdCompressor
A = compress(Meta.CompressionType.ZSTD, compression, A)
- elseif compression isa Vector{ZstdCompressor}
- A = compress(Meta.CompressionType.ZSTD, compression[Threads.threadid()], A)
+ elseif compression isa Symbol && compression == :lz4
+ comp = lz4_frame_compressor()
+ A = Base.@lock comp begin
+ compress(Meta.CompressionType.LZ4_FRAME, comp[], A)
+ end
+ elseif compression isa Symbol && compression == :zstd
+ comp = zstd_compressor()
+ A = Base.@lock comp begin
+ compress(Meta.CompressionType.ZSTD, comp[], A)
+ end
end
@debugv 2 "converted top-level column to arrow format: $(typeof(A))"
@debugv 3 A
diff --git a/src/table.jl b/src/table.jl
index b2d982d..ec9d17c 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -558,9 +558,15 @@ function uncompress(ptr::Ptr{UInt8}, buffer, compression)
end
decodedbytes = Vector{UInt8}(undef, len)
if compression.codec === Meta.CompressionType.LZ4_FRAME
- transcode(LZ4FrameDecompressor, encodedbytes, decodedbytes)
+ comp = lz4_frame_decompressor()
+ Base.@lock comp begin
+ transcode(comp[], encodedbytes, decodedbytes)
+ end
elseif compression.codec === Meta.CompressionType.ZSTD
- transcode(ZstdDecompressor, encodedbytes, decodedbytes)
+ comp = zstd_decompressor()
+ Base.@lock comp begin
+ transcode(comp[], encodedbytes, decodedbytes)
+ end
else
error("unsupported compression type when reading arrow buffers: $(typeof(compression.codec))")
end
diff --git a/src/write.jl b/src/write.jl
index d982b09..a6bd40b 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -111,7 +111,7 @@ julia> open(Arrow.Writer, tempname()) do writer
mutable struct Writer{T<:IO}
io::T
closeio::Bool
- compress::Union{Nothing,LZ4FrameCompressor,Vector{LZ4FrameCompressor},ZstdCompressor,Vector{ZstdCompressor}}
+ compress::Union{Nothing,Symbol,LZ4FrameCompressor,ZstdCompressor}
writetofile::Bool
largelists::Bool
denseunions::Bool
@@ -135,7 +135,10 @@ mutable struct Writer{T<:IO}
isclosed::Bool
end
-function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}}, writetofile::Bool, largelists::Bool, denseunions::Bool, dictencode::Bool, dictencodenested::Bool, alignment::Integer, maxdepth::Integer, ntasks::Integer, meta::Union{Nothing,Any}, colmeta::Union{Nothing,Any}, closeio::Bool) where {T<:IO}
+function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,Symbol,LZ4FrameCompressor,ZstdCompressor}, writetofile::Bool, largelists::Bool, denseunions::Bool, dictencode::Bool, dictencodenested::Bool, alignment::Integer, maxdepth::Integer, ntasks::Integer, meta::Union{Nothing,Any}, colmeta::Union{Nothing,Any}, closeio::Bool) where {T<:IO}
+ if compress isa Symbol && compress !== :lz4 && compress !== :zstd
+ throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
+ end
sync = OrderedSynchronizer(2)
msgs = Channel{Message}(ntasks)
schema = Ref{Tables.Schema}()
@@ -156,18 +159,7 @@ function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,LZ4FrameCompre
return Writer{T}(io, closeio, compress, writetofile, largelists, denseunions, dictencode, dictencodenested, threaded, alignment, maxdepth, meta, colmeta, sync, msgs, schema, firstcols, dictencodings, blocks, task, anyerror, errorref, 1, false)
end
-function Base.open(::Type{Writer}, io::IO, compress::Symbol, args...)
- compressor = if compress === :lz4
- LZ4_FRAME_COMPRESSOR
- elseif compress === :zstd
- ZSTD_COMPRESSOR
- else
- throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
- end
- open(Writer, io, compressor, args...)
-end
-
-function Base.open(::Type{Writer}, io::IO; compress::Union{Nothing,Symbol,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}}=nothing, file::Bool=true, largelists::Bool=false, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Integer=8, maxdepth::Integer=DEFAULT_MAX_DEPTH, ntasks::Integer=typemax(Int32), metadata::Union{Nothing,Any}=nothing, colmetadata::Union{Nothing,Any}=nothing, closeio::Bool=false)
+function Base.open(::Type{Writer}, io::IO; compress::Union{Nothing,Symbol,LZ4FrameCompressor,ZstdCompressor}=nothing, file::Bool=true, largelists::Bool=false, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Integer=8, maxdepth::Integer=DEFAULT_MAX_DEPTH, ntasks::Integer=typemax(Int32), metadata::Union{Nothing,Any}=nothing, colmetadata::Union{Nothing,Any}=nothing, closeio::Bool=false)
open(Writer, io, compress, file, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata, closeio)
end