You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by qu...@apache.org on 2023/05/30 18:52:38 UTC

[arrow-julia] branch main updated: Refactor compressors/decompressors for laziness + safety (#445)

This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fe4ec0  Refactor compressors/decompressors for laziness + safety (#445)
6fe4ec0 is described below

commit 6fe4ec0058657a5fe9a18b226a54a03a56d5bf81
Author: Jacob Quinn <qu...@gmail.com>
AuthorDate: Tue May 30 12:52:33 2023 -0600

    Refactor compressors/decompressors for laziness + safety (#445)
    
    Fixes #396.
    
    As noted in the originally reported issue, enabling debug logging when
    writing arrow data with compression can result in segfaults because the
    underlying CodecX package have debug logs, causing task
    switches/migration and thus making the pattern of using a single
    `X_COMPRESSOR` array indexed by `Threads.threadid()` unsafe since
    multiple threads may try using the same compressor at the same time.
    
    We fix this by wrapping each compressor in a `Lockable` and ensuring the
    `compress` (or `uncompress`) operation holds the lock for the duration
    of the operation. We also:
    * Add a decompressor per thread to avoid recreating them over and over
    during reading
    * Lazily initialize compressors/decompressors in a way that is 1.9+ safe
    and only creates the object when needed by a specific thread
    * Switch from WorkerUtilities -> ConcurrentUtilities (the package was
    renamed)
    
    Successor to https://github.com/apache/arrow-julia/pull/397; I've added
    @svilupp as a co-author here since they started the original movement
    for the code to go in this direction.
    
    ---------
    
    Co-authored-by: J S <49...@users.noreply.github.com>
---
 Project.toml                 |  4 +--
 src/Arrow.jl                 | 65 +++++++++++++++++++++++++++++++++++++-------
 src/append.jl                |  8 +-----
 src/arraytypes/arraytypes.jl | 16 +++++++----
 src/table.jl                 | 10 +++++--
 src/write.jl                 | 20 ++++----------
 6 files changed, 83 insertions(+), 40 deletions(-)

diff --git a/Project.toml b/Project.toml
index 5f75563..ed76b2f 100644
--- a/Project.toml
+++ b/Project.toml
@@ -24,6 +24,7 @@ ArrowTypes = "31f734f8-188a-4ce0-8406-c8a06bd891cd"
 BitIntegers = "c3b6d118-76ef-56ca-8cc7-ebb389d030a1"
 CodecLz4 = "5ba52731-8f18-5e0d-9241-30f10d1ec561"
 CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2"
+ConcurrentUtilities = "f0e56b4a-5159-44fe-b623-3e5288b988bb"
 DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
 Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
 EnumX = "4e289a0a-7415-4d19-859d-a7e5c4648b56"
@@ -35,13 +36,13 @@ Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
 TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53"
 TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa"
 UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
-WorkerUtilities = "76eceee3-57b5-4d4a-8e66-0e911cebbf60"
 
 [compat]
 ArrowTypes = "1.1,2"
 BitIntegers = "0.2, 0.3"
 CodecLz4 = "0.4"
 CodecZstd = "0.7"
+ConcurrentUtilities = "2"
 DataAPI = "1"
 EnumX = "1"
 FilePathsBase = "0.9"
@@ -51,7 +52,6 @@ SentinelArrays = "1"
 Tables = "1.1"
 TimeZones = "1"
 TranscodingStreams = "0.9.12"
-WorkerUtilities = "1.1"
 julia = "1.6"
 
 [extras]
diff --git a/src/Arrow.jl b/src/Arrow.jl
index 01c7993..9d5a7fb 100644
--- a/src/Arrow.jl
+++ b/src/Arrow.jl
@@ -45,7 +45,7 @@ using Base.Iterators
 using Mmap
 using LoggingExtras
 import Dates
-using DataAPI, Tables, SentinelArrays, PooledArrays, CodecLz4, CodecZstd, TimeZones, BitIntegers, WorkerUtilities
+using DataAPI, Tables, SentinelArrays, PooledArrays, CodecLz4, CodecZstd, TimeZones, BitIntegers, ConcurrentUtilities
 
 export ArrowTypes
 
@@ -71,18 +71,63 @@ include("write.jl")
 include("append.jl")
 include("show.jl")
 
-const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[]
-const ZSTD_COMPRESSOR = ZstdCompressor[]
+const ZSTD_COMPRESSOR = Lockable{ZstdCompressor}[]
+const ZSTD_DECOMPRESSOR = Lockable{ZstdDecompressor}[]
+const LZ4_FRAME_COMPRESSOR = Lockable{LZ4FrameCompressor}[]
+const LZ4_FRAME_DECOMPRESSOR = Lockable{LZ4FrameDecompressor}[]
+
+function init_zstd_compressor()
+    zstd = ZstdCompressor(; level=3)
+    CodecZstd.TranscodingStreams.initialize(zstd)
+    return Lockable(zstd)
+end
+
+function init_zstd_decompressor()
+    zstd = ZstdDecompressor()
+    CodecZstd.TranscodingStreams.initialize(zstd)
+    return Lockable(zstd)
+end
+
+function init_lz4_frame_compressor()
+    lz4 = LZ4FrameCompressor(; compressionlevel=4)
+    CodecLz4.TranscodingStreams.initialize(lz4)
+    return Lockable(lz4)
+end
+
+function init_lz4_frame_decompressor()
+    lz4 = LZ4FrameDecompressor()
+    CodecLz4.TranscodingStreams.initialize(lz4)
+    return Lockable(lz4)
+end
+
+function access_threaded(f, v::Vector)
+    tid = Threads.threadid()
+    0 < tid <= length(v) || _length_assert()
+    if @inbounds isassigned(v, tid)
+        @inbounds x = v[tid]
+    else
+        x = f()
+        @inbounds v[tid] = x
+    end
+    return x
+end
+@noinline _length_assert() =  @assert false "0 < tid <= v"
+
+zstd_compressor() = access_threaded(init_zstd_compressor, ZSTD_COMPRESSOR)
+zstd_decompressor() = access_threaded(init_zstd_decompressor, ZSTD_DECOMPRESSOR)
+lz4_frame_compressor() = access_threaded(init_lz4_frame_compressor, LZ4_FRAME_COMPRESSOR)
+lz4_frame_decompressor() = access_threaded(init_lz4_frame_decompressor, LZ4_FRAME_DECOMPRESSOR)
 
 function __init__()
-    for _ = 1:Threads.nthreads()
-        zstd = ZstdCompressor(; level=3)
-        CodecZstd.TranscodingStreams.initialize(zstd)
-        push!(ZSTD_COMPRESSOR, zstd)
-        lz4 = LZ4FrameCompressor(; compressionlevel=4)
-        CodecLz4.TranscodingStreams.initialize(lz4)
-        push!(LZ4_FRAME_COMPRESSOR, lz4)
+    nt = @static if isdefined(Base.Threads, :maxthreadid)
+        Threads.maxthreadid()
+    else
+        Threads.nthreads()
     end
+    resize!(empty!(LZ4_FRAME_COMPRESSOR), nt)
+    resize!(empty!(ZSTD_COMPRESSOR), nt)
+    resize!(empty!(LZ4_FRAME_DECOMPRESSOR), nt)
+    resize!(empty!(ZSTD_DECOMPRESSOR), nt)
     return
 end
 
diff --git a/src/append.jl b/src/append.jl
index 4bd45b3..fdcd5e5 100644
--- a/src/append.jl
+++ b/src/append.jl
@@ -109,15 +109,9 @@ function append(io::IO, tbl;
         if !isstream
             throw(ArgumentError("append is supported only to files in arrow stream format"))
         end
-
-        if compress === :lz4
-            compress = LZ4_FRAME_COMPRESSOR
-        elseif compress === :zstd
-            compress = ZSTD_COMPRESSOR
-        elseif compress isa Symbol
+        if compress isa Symbol && compress !== :lz4 && compress !== :zstd
             throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
         end
-
         append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata)
     end
 
diff --git a/src/arraytypes/arraytypes.jl b/src/arraytypes/arraytypes.jl
index 3bbfd0e..a3449f1 100644
--- a/src/arraytypes/arraytypes.jl
+++ b/src/arraytypes/arraytypes.jl
@@ -31,18 +31,24 @@ nullcount(x::ArrowVector) = validitybitmap(x).nc
 getmetadata(x::ArrowVector) = x.metadata
 Base.deleteat!(x::T, inds) where {T <: ArrowVector} = throw(ArgumentError("`$T` does not support `deleteat!`; arrow data is by nature immutable"))
 
-function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, Vector{LZ4FrameCompressor}, LZ4FrameCompressor, Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
+function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, kw...)
     @debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(values(kw))"
     @debugv 3 x
     A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...)
     if compression isa LZ4FrameCompressor
         A = compress(Meta.CompressionType.LZ4_FRAME, compression, A)
-    elseif compression isa Vector{LZ4FrameCompressor}
-        A = compress(Meta.CompressionType.LZ4_FRAME, compression[Threads.threadid()], A)
     elseif compression isa ZstdCompressor
         A = compress(Meta.CompressionType.ZSTD, compression, A)
-    elseif compression isa Vector{ZstdCompressor}
-        A = compress(Meta.CompressionType.ZSTD, compression[Threads.threadid()], A)
+    elseif compression isa Symbol && compression == :lz4
+        comp = lz4_frame_compressor()
+        A = Base.@lock comp begin
+            compress(Meta.CompressionType.LZ4_FRAME, comp[], A)
+        end
+    elseif compression isa Symbol && compression == :zstd
+        comp = zstd_compressor()
+        A = Base.@lock comp begin
+            compress(Meta.CompressionType.ZSTD, comp[], A)
+        end
     end
     @debugv 2 "converted top-level column to arrow format: $(typeof(A))"
     @debugv 3 A
diff --git a/src/table.jl b/src/table.jl
index b2d982d..ec9d17c 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -558,9 +558,15 @@ function uncompress(ptr::Ptr{UInt8}, buffer, compression)
     end
     decodedbytes = Vector{UInt8}(undef, len)
     if compression.codec === Meta.CompressionType.LZ4_FRAME
-        transcode(LZ4FrameDecompressor, encodedbytes, decodedbytes)
+        comp = lz4_frame_decompressor()
+        Base.@lock comp begin
+            transcode(comp[], encodedbytes, decodedbytes)
+        end
     elseif compression.codec === Meta.CompressionType.ZSTD
-        transcode(ZstdDecompressor, encodedbytes, decodedbytes)
+        comp = zstd_decompressor()
+        Base.@lock comp begin
+            transcode(comp[], encodedbytes, decodedbytes)
+        end
     else
         error("unsupported compression type when reading arrow buffers: $(typeof(compression.codec))")
     end
diff --git a/src/write.jl b/src/write.jl
index d982b09..a6bd40b 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -111,7 +111,7 @@ julia> open(Arrow.Writer, tempname()) do writer
 mutable struct Writer{T<:IO}
     io::T
     closeio::Bool
-    compress::Union{Nothing,LZ4FrameCompressor,Vector{LZ4FrameCompressor},ZstdCompressor,Vector{ZstdCompressor}}
+    compress::Union{Nothing,Symbol,LZ4FrameCompressor,ZstdCompressor}
     writetofile::Bool
     largelists::Bool
     denseunions::Bool
@@ -135,7 +135,10 @@ mutable struct Writer{T<:IO}
     isclosed::Bool
 end
 
-function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}}, writetofile::Bool, largelists::Bool, denseunions::Bool, dictencode::Bool, dictencodenested::Bool, alignment::Integer, maxdepth::Integer, ntasks::Integer, meta::Union{Nothing,Any}, colmeta::Union{Nothing,Any}, closeio::Bool) where {T<:IO}
+function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,Symbol,LZ4FrameCompressor,ZstdCompressor}, writetofile::Bool, largelists::Bool, denseunions::Bool, dictencode::Bool, dictencodenested::Bool, alignment::Integer, maxdepth::Integer, ntasks::Integer, meta::Union{Nothing,Any}, colmeta::Union{Nothing,Any}, closeio::Bool) where {T<:IO}
+    if compress isa Symbol && compress !== :lz4 && compress !== :zstd
+        throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
+    end
     sync = OrderedSynchronizer(2)
     msgs = Channel{Message}(ntasks)
     schema = Ref{Tables.Schema}()
@@ -156,18 +159,7 @@ function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,LZ4FrameCompre
     return Writer{T}(io, closeio, compress, writetofile, largelists, denseunions, dictencode, dictencodenested, threaded, alignment, maxdepth, meta, colmeta, sync, msgs, schema, firstcols, dictencodings, blocks, task, anyerror, errorref, 1, false)
 end
 
-function Base.open(::Type{Writer}, io::IO, compress::Symbol, args...)
-    compressor = if compress === :lz4
-        LZ4_FRAME_COMPRESSOR
-    elseif compress === :zstd
-        ZSTD_COMPRESSOR
-    else
-        throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
-    end
-    open(Writer, io, compressor, args...)
-end
-
-function Base.open(::Type{Writer}, io::IO; compress::Union{Nothing,Symbol,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}}=nothing, file::Bool=true, largelists::Bool=false, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Integer=8, maxdepth::Integer=DEFAULT_MAX_DEPTH, ntasks::Integer=typemax(Int32), metadata::Union{Nothing,Any}=nothing, colmetadata::Union{Nothing,Any}=nothing, closeio::Bool=false)
+function Base.open(::Type{Writer}, io::IO; compress::Union{Nothing,Symbol,LZ4FrameCompressor,ZstdCompressor}=nothing, file::Bool=true, largelists::Bool=false, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Integer=8, maxdepth::Integer=DEFAULT_MAX_DEPTH, ntasks::Integer=typemax(Int32), metadata::Union{Nothing,Any}=nothing, colmetadata::Union{Nothing,Any}=nothing, closeio::Bool=false)
     open(Writer, io, compress, file, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata, closeio)
 end