You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by sb...@apache.org on 2019/04/30 19:23:22 UTC
[arrow] branch master updated: ARROW-5173: [Go] handle multiple
concatenated record batches
This is an automated email from the ASF dual-hosted git repository.
sbinet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 30a473e ARROW-5173: [Go] handle multiple concatenated record batches
30a473e is described below
commit 30a473e67635dbcff891ed5bb7a2235b5035d171
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Tue Apr 30 21:22:19 2019 +0200
ARROW-5173: [Go] handle multiple concatenated record batches
Author: Sebastien Binet <bi...@cern.ch>
Closes #4160 from sbinet/issue-5173 and squashes the following commits:
623c3d2a <Sebastien Binet> ARROW-5173: handle multiple concatenated record batches
---
go/arrow/ipc/cmd/arrow-cat/main.go | 35 +++++++++++++++++++------------
go/arrow/ipc/cmd/arrow-ls/main.go | 43 +++++++++++++++++++++++++++++---------
2 files changed, 55 insertions(+), 23 deletions(-)
diff --git a/go/arrow/ipc/cmd/arrow-cat/main.go b/go/arrow/ipc/cmd/arrow-cat/main.go
index cb4ff5a..e5f75e7 100644
--- a/go/arrow/ipc/cmd/arrow-cat/main.go
+++ b/go/arrow/ipc/cmd/arrow-cat/main.go
@@ -89,19 +89,24 @@ func processStream(w io.Writer, rin io.Reader) error {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(nil, 0)
- r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
- if err != nil {
- return err
- }
- defer r.Release()
-
- n := 0
- for r.Next() {
- n++
- fmt.Fprintf(w, "record %d...\n", n)
- rec := r.Record()
- for i, col := range rec.Columns() {
- fmt.Fprintf(w, " col[%d] %q: %v\n", i, rec.ColumnName(i), col)
+ for {
+ r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
+ if err != nil {
+ if errors.Cause(err) == io.EOF {
+ return nil
+ }
+ return err
+ }
+ defer r.Release()
+
+ n := 0
+ for r.Next() {
+ n++
+ fmt.Fprintf(w, "record %d...\n", n)
+ rec := r.Record()
+ for i, col := range rec.Columns() {
+ fmt.Fprintf(w, " col[%d] %q: %v\n", i, rec.ColumnName(i), col)
+ }
}
}
return nil
@@ -142,6 +147,9 @@ func processFile(w io.Writer, fname string) error {
r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem))
if err != nil {
+ if errors.Cause(err) == io.EOF {
+ return nil
+ }
return err
}
defer r.Close()
@@ -159,6 +167,7 @@ func processFile(w io.Writer, fname string) error {
fmt.Fprintf(w, " col[%d] %q: %v\n", i, rec.ColumnName(i), col)
}
}
+
return nil
}
diff --git a/go/arrow/ipc/cmd/arrow-ls/main.go b/go/arrow/ipc/cmd/arrow-ls/main.go
index b858bf5..cf9a765 100644
--- a/go/arrow/ipc/cmd/arrow-ls/main.go
+++ b/go/arrow/ipc/cmd/arrow-ls/main.go
@@ -53,6 +53,7 @@
package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-ls"
import (
+ "bytes"
"flag"
"fmt"
"io"
@@ -63,6 +64,7 @@ import (
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
+ "github.com/pkg/errors"
)
func main() {
@@ -87,19 +89,24 @@ func processStream(w io.Writer, rin io.Reader) error {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(nil, 0)
- r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
- if err != nil {
- return err
- }
- defer r.Release()
+ for {
+ r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
+ if err != nil {
+ if errors.Cause(err) == io.EOF {
+ return nil
+ }
+ return err
+ }
+ defer r.Release()
- fmt.Fprintf(w, "schema:\n%v", displaySchema(r.Schema()))
+ fmt.Fprintf(w, "schema:\n%v", displaySchema(r.Schema()))
- nrecs := 0
- for r.Next() {
- nrecs++
+ nrecs := 0
+ for r.Next() {
+ nrecs++
+ }
+ fmt.Fprintf(w, "records: %d\n", nrecs)
}
- fmt.Fprintf(w, "records: %d\n", nrecs)
return nil
}
@@ -121,11 +128,26 @@ func processFile(w io.Writer, fname string) error {
}
defer f.Close()
+ hdr := make([]byte, len(ipc.Magic))
+ _, err = io.ReadFull(f, hdr)
+ if err != nil {
+ return errors.Errorf("could not read file header: %v", err)
+ }
+ f.Seek(0, io.SeekStart)
+
+ if !bytes.Equal(hdr, ipc.Magic) {
+ // try as a stream.
+ return processStream(w, f)
+ }
+
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(nil, 0)
r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem))
if err != nil {
+ if errors.Cause(err) == io.EOF {
+ return nil
+ }
return err
}
defer r.Close()
@@ -133,6 +155,7 @@ func processFile(w io.Writer, fname string) error {
fmt.Fprintf(w, "version: %v\n", r.Version())
fmt.Fprintf(w, "schema:\n%v", displaySchema(r.Schema()))
fmt.Fprintf(w, "records: %d\n", r.NumRecords())
+
return nil
}