You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by qu...@apache.org on 2023/05/31 00:14:17 UTC

[arrow-julia] branch main updated: Base.isdone for Stream (#428)

This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git


The following commit(s) were added to refs/heads/main by this push:
     new d1b5326  Base.isdone for Stream (#428)
d1b5326 is described below

commit d1b53263cd73a4a8fb61cfb7c0392a92239fe43e
Author: Ben Baumgold <49...@users.noreply.github.com>
AuthorDate: Tue May 30 20:14:11 2023 -0400

    Base.isdone for Stream (#428)
    
    Implement Base.isdone for Arrow.Stream
    
    
    https://github.com/JuliaLang/julia/blob/v1.8.5/base/essentials.jl#L882-L895
    
    This enables calling `Base.isempty(::Arrow.Stream)` without affecting
    the state of the Arrow.Stream object.
---
 src/table.jl     | 34 ++++++++++++++++++++++++++++++----
 test/runtests.jl |  1 +
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/src/table.jl b/src/table.jl
index ec9d17c..da23038 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -87,9 +87,25 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
     Stream(inputs, inputindex, batchiterator, names, types, schema, dictencodings, dictencoded, convert, compression)
 end
 
-Stream(input, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
-Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
-Stream(inputs::Vector; kw...) = Stream([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
+function Stream(input, pos::Integer=1, len=nothing; kw...)
+    b = tobytes(input)
+    isempty(b) ? Stream(ArrowBlob[]; kw...) : Stream([ArrowBlob(b, pos, len)]; kw...)
+end
+
+function Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...)
+    b = tobytes(input)
+    isempty(b) ? Stream(ArrowBlob[]; kw...) : Stream([ArrowBlob(b, pos, len)]; kw...)
+end
+
+function Stream(inputs::AbstractVector; kw...)
+    blobs = ArrowBlob[]
+    for x in inputs
+        b = tobytes(x)
+        isempty(b) && continue
+        push!(blobs, ArrowBlob(b, 1, nothing))
+    end
+    Stream(blobs; kw...)
+end
 
 function initialize!(x::Stream)
     isempty(getfield(x, :names)) || return
@@ -116,8 +132,14 @@ end
 
 Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
 Base.eltype(::Type{Stream}) = Table
+Base.isdone(x::Stream) = x.inputindex > length(x.inputs)
 
 function Base.iterate(x::Stream, (pos, id)=(1, 0))
+    if Base.isdone(x)
+        x.inputindex = 1
+        x.batchiterator = nothing
+        return nothing
+    end
     if isnothing(x.batchiterator)
         blob = x.inputs[x.inputindex]
         x.batchiterator = BatchIterator(blob)
@@ -132,7 +154,11 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
         # check for additional inputs
         while state === nothing
             x.inputindex += 1
-            x.inputindex > length(x.inputs) && return nothing
+            if Base.isdone(x)
+                x.inputindex = 1
+                x.batchiterator = nothing
+                return nothing
+            end
             blob = x.inputs[x.inputindex]
             x.batchiterator = BatchIterator(blob)
             pos = x.batchiterator.startpos
diff --git a/test/runtests.jl b/test/runtests.jl
index 8a8bccd..dfac2c2 100644
--- a/test/runtests.jl
+++ b/test/runtests.jl
@@ -122,6 +122,7 @@ tt = Arrow.Table(io)
 seekstart(io)
 str = Arrow.Stream(io)
 @test eltype(str) == Arrow.Table
+@test !Base.isdone(str)
 state = iterate(str)
 @test state !== nothing
 tt, st = state