You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by qu...@apache.org on 2023/05/31 00:14:17 UTC
[arrow-julia] branch main updated: Base.isdone for Stream (#428)
This is an automated email from the ASF dual-hosted git repository.
quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
The following commit(s) were added to refs/heads/main by this push:
new d1b5326 Base.isdone for Stream (#428)
d1b5326 is described below
commit d1b53263cd73a4a8fb61cfb7c0392a92239fe43e
Author: Ben Baumgold <49...@users.noreply.github.com>
AuthorDate: Tue May 30 20:14:11 2023 -0400
Base.isdone for Stream (#428)
Implement Base.isdone for Arrow.Stream
https://github.com/JuliaLang/julia/blob/v1.8.5/base/essentials.jl#L882-L895
This enables calling `Base.isempty(::Arrow.Stream)` without affecting
the state of the Arrow.Stream object.
---
src/table.jl | 34 ++++++++++++++++++++++++++++++----
test/runtests.jl | 1 +
2 files changed, 31 insertions(+), 4 deletions(-)
diff --git a/src/table.jl b/src/table.jl
index ec9d17c..da23038 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -87,9 +87,25 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
Stream(inputs, inputindex, batchiterator, names, types, schema, dictencodings, dictencoded, convert, compression)
end
-Stream(input, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
-Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
-Stream(inputs::Vector; kw...) = Stream([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
+function Stream(input, pos::Integer=1, len=nothing; kw...)
+ b = tobytes(input)
+ isempty(b) ? Stream(ArrowBlob[]; kw...) : Stream([ArrowBlob(b, pos, len)]; kw...)
+end
+
+function Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...)
+ b = tobytes(input)
+ isempty(b) ? Stream(ArrowBlob[]; kw...) : Stream([ArrowBlob(b, pos, len)]; kw...)
+end
+
+function Stream(inputs::AbstractVector; kw...)
+ blobs = ArrowBlob[]
+ for x in inputs
+ b = tobytes(x)
+ isempty(b) && continue
+ push!(blobs, ArrowBlob(b, 1, nothing))
+ end
+ Stream(blobs; kw...)
+end
function initialize!(x::Stream)
isempty(getfield(x, :names)) || return
@@ -116,8 +132,14 @@ end
Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
Base.eltype(::Type{Stream}) = Table
+Base.isdone(x::Stream) = x.inputindex > length(x.inputs)
function Base.iterate(x::Stream, (pos, id)=(1, 0))
+ if Base.isdone(x)
+ x.inputindex = 1
+ x.batchiterator = nothing
+ return nothing
+ end
if isnothing(x.batchiterator)
blob = x.inputs[x.inputindex]
x.batchiterator = BatchIterator(blob)
@@ -132,7 +154,11 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
# check for additional inputs
while state === nothing
x.inputindex += 1
- x.inputindex > length(x.inputs) && return nothing
+ if Base.isdone(x)
+ x.inputindex = 1
+ x.batchiterator = nothing
+ return nothing
+ end
blob = x.inputs[x.inputindex]
x.batchiterator = BatchIterator(blob)
pos = x.batchiterator.startpos
diff --git a/test/runtests.jl b/test/runtests.jl
index 8a8bccd..dfac2c2 100644
--- a/test/runtests.jl
+++ b/test/runtests.jl
@@ -122,6 +122,7 @@ tt = Arrow.Table(io)
seekstart(io)
str = Arrow.Stream(io)
@test eltype(str) == Arrow.Table
+@test !Base.isdone(str)
state = iterate(str)
@test state !== nothing
tt, st = state