You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by sb...@apache.org on 2019/04/30 19:23:22 UTC

[arrow] branch master updated: ARROW-5173: [Go] handle multiple concatenated record batches

This is an automated email from the ASF dual-hosted git repository.

sbinet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 30a473e  ARROW-5173: [Go] handle multiple concatenated record batches
30a473e is described below

commit 30a473e67635dbcff891ed5bb7a2235b5035d171
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Tue Apr 30 21:22:19 2019 +0200

    ARROW-5173: [Go] handle multiple concatenated record batches
    
    Author: Sebastien Binet <bi...@cern.ch>
    
    Closes #4160 from sbinet/issue-5173 and squashes the following commits:
    
    623c3d2a <Sebastien Binet> ARROW-5173:  handle multiple concatenated record batches
---
 go/arrow/ipc/cmd/arrow-cat/main.go | 35 +++++++++++++++++++------------
 go/arrow/ipc/cmd/arrow-ls/main.go  | 43 +++++++++++++++++++++++++++++---------
 2 files changed, 55 insertions(+), 23 deletions(-)

diff --git a/go/arrow/ipc/cmd/arrow-cat/main.go b/go/arrow/ipc/cmd/arrow-cat/main.go
index cb4ff5a..e5f75e7 100644
--- a/go/arrow/ipc/cmd/arrow-cat/main.go
+++ b/go/arrow/ipc/cmd/arrow-cat/main.go
@@ -89,19 +89,24 @@ func processStream(w io.Writer, rin io.Reader) error {
 	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
 	defer mem.AssertSize(nil, 0)
 
-	r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
-	if err != nil {
-		return err
-	}
-	defer r.Release()
-
-	n := 0
-	for r.Next() {
-		n++
-		fmt.Fprintf(w, "record %d...\n", n)
-		rec := r.Record()
-		for i, col := range rec.Columns() {
-			fmt.Fprintf(w, "  col[%d] %q: %v\n", i, rec.ColumnName(i), col)
+	for {
+		r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
+		if err != nil {
+			if errors.Cause(err) == io.EOF {
+				return nil
+			}
+			return err
+		}
+		defer r.Release()
+
+		n := 0
+		for r.Next() {
+			n++
+			fmt.Fprintf(w, "record %d...\n", n)
+			rec := r.Record()
+			for i, col := range rec.Columns() {
+				fmt.Fprintf(w, "  col[%d] %q: %v\n", i, rec.ColumnName(i), col)
+			}
 		}
 	}
 	return nil
@@ -142,6 +147,9 @@ func processFile(w io.Writer, fname string) error {
 
 	r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem))
 	if err != nil {
+		if errors.Cause(err) == io.EOF {
+			return nil
+		}
 		return err
 	}
 	defer r.Close()
@@ -159,6 +167,7 @@ func processFile(w io.Writer, fname string) error {
 			fmt.Fprintf(w, "  col[%d] %q: %v\n", i, rec.ColumnName(i), col)
 		}
 	}
+
 	return nil
 }
 
diff --git a/go/arrow/ipc/cmd/arrow-ls/main.go b/go/arrow/ipc/cmd/arrow-ls/main.go
index b858bf5..cf9a765 100644
--- a/go/arrow/ipc/cmd/arrow-ls/main.go
+++ b/go/arrow/ipc/cmd/arrow-ls/main.go
@@ -53,6 +53,7 @@
 package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-ls"
 
 import (
+	"bytes"
 	"flag"
 	"fmt"
 	"io"
@@ -63,6 +64,7 @@ import (
 	"github.com/apache/arrow/go/arrow"
 	"github.com/apache/arrow/go/arrow/ipc"
 	"github.com/apache/arrow/go/arrow/memory"
+	"github.com/pkg/errors"
 )
 
 func main() {
@@ -87,19 +89,24 @@ func processStream(w io.Writer, rin io.Reader) error {
 	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
 	defer mem.AssertSize(nil, 0)
 
-	r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
-	if err != nil {
-		return err
-	}
-	defer r.Release()
+	for {
+		r, err := ipc.NewReader(rin, ipc.WithAllocator(mem))
+		if err != nil {
+			if errors.Cause(err) == io.EOF {
+				return nil
+			}
+			return err
+		}
+		defer r.Release()
 
-	fmt.Fprintf(w, "schema:\n%v", displaySchema(r.Schema()))
+		fmt.Fprintf(w, "schema:\n%v", displaySchema(r.Schema()))
 
-	nrecs := 0
-	for r.Next() {
-		nrecs++
+		nrecs := 0
+		for r.Next() {
+			nrecs++
+		}
+		fmt.Fprintf(w, "records: %d\n", nrecs)
 	}
-	fmt.Fprintf(w, "records: %d\n", nrecs)
 	return nil
 }
 
@@ -121,11 +128,26 @@ func processFile(w io.Writer, fname string) error {
 	}
 	defer f.Close()
 
+	hdr := make([]byte, len(ipc.Magic))
+	_, err = io.ReadFull(f, hdr)
+	if err != nil {
+		return errors.Errorf("could not read file header: %v", err)
+	}
+	f.Seek(0, io.SeekStart)
+
+	if !bytes.Equal(hdr, ipc.Magic) {
+		// try as a stream.
+		return processStream(w, f)
+	}
+
 	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
 	defer mem.AssertSize(nil, 0)
 
 	r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem))
 	if err != nil {
+		if errors.Cause(err) == io.EOF {
+			return nil
+		}
 		return err
 	}
 	defer r.Close()
@@ -133,6 +155,7 @@ func processFile(w io.Writer, fname string) error {
 	fmt.Fprintf(w, "version: %v\n", r.Version())
 	fmt.Fprintf(w, "schema:\n%v", displaySchema(r.Schema()))
 	fmt.Fprintf(w, "records: %d\n", r.NumRecords())
+
 	return nil
 }