You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "baumgold (via GitHub)" <gi...@apache.org> on 2023/03/13 23:45:38 UTC

[GitHub] [arrow-julia] baumgold commented on a diff in pull request #397: Add locks for de-/compressor + initialize decompressor

baumgold commented on code in PR #397:
URL: https://github.com/apache/arrow-julia/pull/397#discussion_r1134720104


##########
src/arraytypes/arraytypes.jl:
##########
@@ -35,14 +35,19 @@ function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=g
     @debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)"

Review Comment:
   This function should probably be broken down into a few smaller functions for clarity:
   
   ```julia
   
   toarrowvector(a::ArrowVector, compression::Nothing, kw...) = a
   
   function toarrowvector(a::ArrowVector, compression::AbstractVector{LZ4FrameCompressor}, kw...)
       tid=Threads.threadid()
       lock(LZ4_FRAME_COMPRESSOR_LOCK[tid]) do
           compress(Meta.CompressionTypes.LZ4_FRAME, compression[tid], a)
       end
   end
   
   function toarrowvector(a::ArrowVector, compression::LZ4FrameCompressor, Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
       compress(Meta.CompressionTypes.LZ4_FRAME, compression, a)
   end
   
   function toarrowvector(a::ArrowVector, compression::AbstractVector{ZstdCompressor}, kw...)
       compress(Meta.CompressionTypes.ZSTD, compression, a)
   end
   
   function toarrowvector(a::ArrowVector, compression::ZstdCompressor, kw...)
       tid=Threads.threadid()
       lock(ZSTD_COMPRESSOR_LOCK[tid]) do
           compress(Meta.CompressionTypes.ZSTD, compression[tid], a)
       end
   end
   
   function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, Vector{LZ4FrameCompressor}, LZ4FrameCompressor, Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
       @debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)"
       @debugv 3 x
       A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...)
       A = toarrowvector(A, compression)
       @debugv 2 "converted top-level column to arrow format: $(typeof(A))"
       @debugv 3 A
       return A
   end
   
   
   ```



##########
src/Arrow.jl:
##########
@@ -73,6 +73,13 @@ include("show.jl")
 
 const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[]
 const ZSTD_COMPRESSOR = ZstdCompressor[]
+const LZ4_FRAME_DECOMPRESSOR = LZ4FrameDecompressor[]
+const ZSTD_DECOMPRESSOR = ZstdDecompressor[]
+# add locks for multithreaded (de/)compression (because we index by threadid which might not be safe under Julia >1.8)
+const LZ4_FRAME_COMPRESSOR_LOCK = ReentrantLock[]

Review Comment:
   A single thread can only do one thing at a time so maybe we can have a single lock per thread to be shared amongst all compressors/decompressors?



##########
src/Arrow.jl:
##########
@@ -82,6 +89,19 @@ function __init__()
         lz4 = LZ4FrameCompressor(; compressionlevel=4)
         CodecLz4.TranscodingStreams.initialize(lz4)
         push!(LZ4_FRAME_COMPRESSOR, lz4)
+        # Locks
+        push!(LZ4_FRAME_COMPRESSOR_LOCK, ReentrantLock())
+        push!(ZSTD_COMPRESSOR_LOCK, ReentrantLock())
+        # Decompressors
+        zstdd = ZstdDecompressor()
+        CodecZstd.TranscodingStreams.initialize(zstdd)

Review Comment:
   Maybe we should be a bit lazier about initializing these as I believe they're quite expensive.  Shall we only initialize them only as needed?  For example, if a user is only decompressing (reading) data then there's no reason to pay the penalty of initializing compressors (and vice versa).  Same idea for LZ4 vs ZSTD - if a user only uses one then no need to initialize both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org