You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/02 17:36:00 UTC
[arrow] branch master updated: ARROW-3684: [Go] Add chunking
ability to CSV reader
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new fd2f798 ARROW-3684: [Go] Add chunking ability to CSV reader
fd2f798 is described below
commit fd2f798adccaf1644aa32a9b9cf2873a8f540142
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Sun Dec 2 11:35:53 2018 -0600
ARROW-3684: [Go] Add chunking ability to CSV reader
Author: Sebastien Binet <bi...@cern.ch>
Closes #3019 from sbinet/issue-3684 and squashes the following commits:
64e88515e <Sebastien Binet> ARROW-3684: Add chunking ability to CSV reader
---
go/arrow/csv/csv.go | 98 +++++++++++++++-
go/arrow/csv/csv_test.go | 295 +++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 387 insertions(+), 6 deletions(-)
diff --git a/go/arrow/csv/csv.go b/go/arrow/csv/csv.go
index 36f3abd..79f2280 100644
--- a/go/arrow/csv/csv.go
+++ b/go/arrow/csv/csv.go
@@ -17,8 +17,6 @@
// Package csv reads CSV files and presents the extracted data as records.
package csv
-// TODO: implement a row chunker to accumulate N rows into the current record.
-
import (
"encoding/csv"
"errors"
@@ -61,6 +59,19 @@ func WithAllocator(mem memory.Allocator) Option {
}
}
+// WithChunk specifies the chunk size used while parsing CSV files.
+//
+// If n is zero or 1, no chunking will take place and the reader will create
+// one record per row.
+// If n is greater than 1, chunks of n rows will be read.
+// If n is negative, the reader will load the whole CSV file into memory and
+// create one big record with all the rows.
+func WithChunk(n int) Option {
+ return func(r *Reader) {
+ r.chunk = n
+ }
+}
+
// Reader wraps encoding/csv.Reader and creates array.Records from a schema.
type Reader struct {
r *csv.Reader
@@ -71,6 +82,10 @@ type Reader struct {
cur array.Record
err error
+ chunk int
+ done bool
+ next func() bool
+
mem memory.Allocator
}
@@ -82,7 +97,7 @@ type Reader struct {
func NewReader(r io.Reader, schema *arrow.Schema, opts ...Option) *Reader {
validate(schema)
- rr := &Reader{r: csv.NewReader(r), schema: schema, refs: 1}
+ rr := &Reader{r: csv.NewReader(r), schema: schema, refs: 1, chunk: 1}
for _, opt := range opts {
opt(rr)
}
@@ -93,6 +108,14 @@ func NewReader(r io.Reader, schema *arrow.Schema, opts ...Option) *Reader {
rr.bld = array.NewRecordBuilder(rr.mem, rr.schema)
+ switch {
+ case rr.chunk < 0:
+ rr.next = rr.nextall
+ case rr.chunk > 1:
+ rr.next = rr.nextn
+ default:
+ rr.next = rr.next1
+ }
return rr
}
@@ -117,13 +140,20 @@ func (r *Reader) Next() bool {
r.cur = nil
}
- if r.err != nil {
+ if r.err != nil || r.done {
return false
}
+ return r.next()
+}
+
+// next1 reads one row from the CSV file and creates a single Record
+// from that row.
+func (r *Reader) next1() bool {
var recs []string
recs, r.err = r.r.Read()
if r.err != nil {
+ r.done = true
if r.err == io.EOF {
r.err = nil
}
@@ -132,8 +162,65 @@ func (r *Reader) Next() bool {
r.validate(recs)
r.read(recs)
+ r.cur = r.bld.NewRecord()
- return r.err == nil
+ return true
+}
+
+// nextall reads the whole CSV file into memory and creates one single
+// Record from all the CSV rows.
+func (r *Reader) nextall() bool {
+ defer func() {
+ r.done = true
+ }()
+
+ var (
+ recs [][]string
+ )
+
+ recs, r.err = r.r.ReadAll()
+ if r.err != nil {
+ return false
+ }
+
+ for _, rec := range recs {
+ r.validate(rec)
+ r.read(rec)
+ }
+ r.cur = r.bld.NewRecord()
+
+ return true
+}
+
+// nextn reads n rows from the CSV file, where n is the chunk size, and creates
+// a Record from these rows.
+func (r *Reader) nextn() bool {
+ var (
+ recs []string
+ n = 0
+ )
+
+ for i := 0; i < r.chunk && !r.done; i++ {
+ recs, r.err = r.r.Read()
+ if r.err != nil {
+ r.done = true
+ break
+ }
+
+ r.validate(recs)
+ r.read(recs)
+ n++
+ }
+
+ if r.err != nil {
+ r.done = true
+ if r.err == io.EOF {
+ r.err = nil
+ }
+ }
+
+ r.cur = r.bld.NewRecord()
+ return n > 0
}
func (r *Reader) validate(recs []string) {
@@ -193,7 +280,6 @@ func (r *Reader) read(recs []string) {
r.bld.Field(i).(*array.StringBuilder).Append(str)
}
}
- r.cur = r.bld.NewRecord()
}
func (r *Reader) readI8(str string) int8 {
diff --git a/go/arrow/csv/csv_test.go b/go/arrow/csv/csv_test.go
index 534e8ea..f53cf17 100644
--- a/go/arrow/csv/csv_test.go
+++ b/go/arrow/csv/csv_test.go
@@ -89,6 +89,52 @@ func Example() {
// rec[2]["str"]: ["str-9"]
}
+func Example_withChunk() {
+ f, err := os.Open("testdata/simple.csv")
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer f.Close()
+
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ arrow.Field{Name: "i64", Type: arrow.PrimitiveTypes.Int64},
+ arrow.Field{Name: "f64", Type: arrow.PrimitiveTypes.Float64},
+ arrow.Field{Name: "str", Type: arrow.BinaryTypes.String},
+ },
+ nil,
+ )
+ r := csv.NewReader(
+ f, schema,
+ csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithChunk(3),
+ )
+ defer r.Release()
+
+ n := 0
+ for r.Next() {
+ rec := r.Record()
+ for i, col := range rec.Columns() {
+ fmt.Printf("rec[%d][%q]: %v\n", i, rec.ColumnName(i), col)
+ }
+ n++
+ }
+
+ // Output:
+ // rec[0]["i64"]: [0 1 2]
+ // rec[1]["f64"]: [0 1 2]
+ // rec[2]["str"]: ["str-0" "str-1" "str-2"]
+ // rec[0]["i64"]: [3 4 5]
+ // rec[1]["f64"]: [3 4 5]
+ // rec[2]["str"]: ["str-3" "str-4" "str-5"]
+ // rec[0]["i64"]: [6 7 8]
+ // rec[1]["f64"]: [6 7 8]
+ // rec[2]["str"]: ["str-6" "str-7" "str-8"]
+ // rec[0]["i64"]: [9]
+ // rec[1]["f64"]: [9]
+ // rec[2]["str"]: ["str-9"]
+}
+
func TestCSVReader(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
@@ -190,3 +236,252 @@ rec[11]["str"]: ["str-2"]
r.Release()
}
}
+
+func TestCSVReaderWithChunk(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ raw, err := ioutil.ReadFile("testdata/simple.csv")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ arrow.Field{Name: "i64", Type: arrow.PrimitiveTypes.Int64},
+ arrow.Field{Name: "f64", Type: arrow.PrimitiveTypes.Float64},
+ arrow.Field{Name: "str", Type: arrow.BinaryTypes.String},
+ },
+ nil,
+ )
+
+ for _, tc := range []struct {
+ name string
+ opts []csv.Option
+ records int
+ want string
+ }{
+ {
+ name: "chunk=default",
+ opts: []csv.Option{csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';')},
+ records: 10,
+ want: `rec[0]["i64"]: [0]
+rec[1]["f64"]: [0]
+rec[2]["str"]: ["str-0"]
+rec[0]["i64"]: [1]
+rec[1]["f64"]: [1]
+rec[2]["str"]: ["str-1"]
+rec[0]["i64"]: [2]
+rec[1]["f64"]: [2]
+rec[2]["str"]: ["str-2"]
+rec[0]["i64"]: [3]
+rec[1]["f64"]: [3]
+rec[2]["str"]: ["str-3"]
+rec[0]["i64"]: [4]
+rec[1]["f64"]: [4]
+rec[2]["str"]: ["str-4"]
+rec[0]["i64"]: [5]
+rec[1]["f64"]: [5]
+rec[2]["str"]: ["str-5"]
+rec[0]["i64"]: [6]
+rec[1]["f64"]: [6]
+rec[2]["str"]: ["str-6"]
+rec[0]["i64"]: [7]
+rec[1]["f64"]: [7]
+rec[2]["str"]: ["str-7"]
+rec[0]["i64"]: [8]
+rec[1]["f64"]: [8]
+rec[2]["str"]: ["str-8"]
+rec[0]["i64"]: [9]
+rec[1]["f64"]: [9]
+rec[2]["str"]: ["str-9"]
+`,
+ },
+ {
+ name: "chunk=0",
+ opts: []csv.Option{
+ csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithChunk(0),
+ },
+ records: 10,
+ want: `rec[0]["i64"]: [0]
+rec[1]["f64"]: [0]
+rec[2]["str"]: ["str-0"]
+rec[0]["i64"]: [1]
+rec[1]["f64"]: [1]
+rec[2]["str"]: ["str-1"]
+rec[0]["i64"]: [2]
+rec[1]["f64"]: [2]
+rec[2]["str"]: ["str-2"]
+rec[0]["i64"]: [3]
+rec[1]["f64"]: [3]
+rec[2]["str"]: ["str-3"]
+rec[0]["i64"]: [4]
+rec[1]["f64"]: [4]
+rec[2]["str"]: ["str-4"]
+rec[0]["i64"]: [5]
+rec[1]["f64"]: [5]
+rec[2]["str"]: ["str-5"]
+rec[0]["i64"]: [6]
+rec[1]["f64"]: [6]
+rec[2]["str"]: ["str-6"]
+rec[0]["i64"]: [7]
+rec[1]["f64"]: [7]
+rec[2]["str"]: ["str-7"]
+rec[0]["i64"]: [8]
+rec[1]["f64"]: [8]
+rec[2]["str"]: ["str-8"]
+rec[0]["i64"]: [9]
+rec[1]["f64"]: [9]
+rec[2]["str"]: ["str-9"]
+`,
+ },
+ {
+ name: "chunk=1",
+ opts: []csv.Option{
+ csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithChunk(1),
+ },
+ records: 10,
+ want: `rec[0]["i64"]: [0]
+rec[1]["f64"]: [0]
+rec[2]["str"]: ["str-0"]
+rec[0]["i64"]: [1]
+rec[1]["f64"]: [1]
+rec[2]["str"]: ["str-1"]
+rec[0]["i64"]: [2]
+rec[1]["f64"]: [2]
+rec[2]["str"]: ["str-2"]
+rec[0]["i64"]: [3]
+rec[1]["f64"]: [3]
+rec[2]["str"]: ["str-3"]
+rec[0]["i64"]: [4]
+rec[1]["f64"]: [4]
+rec[2]["str"]: ["str-4"]
+rec[0]["i64"]: [5]
+rec[1]["f64"]: [5]
+rec[2]["str"]: ["str-5"]
+rec[0]["i64"]: [6]
+rec[1]["f64"]: [6]
+rec[2]["str"]: ["str-6"]
+rec[0]["i64"]: [7]
+rec[1]["f64"]: [7]
+rec[2]["str"]: ["str-7"]
+rec[0]["i64"]: [8]
+rec[1]["f64"]: [8]
+rec[2]["str"]: ["str-8"]
+rec[0]["i64"]: [9]
+rec[1]["f64"]: [9]
+rec[2]["str"]: ["str-9"]
+`,
+ },
+ {
+ name: "chunk=3",
+ opts: []csv.Option{
+ csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithChunk(3),
+ },
+ records: 4,
+ want: `rec[0]["i64"]: [0 1 2]
+rec[1]["f64"]: [0 1 2]
+rec[2]["str"]: ["str-0" "str-1" "str-2"]
+rec[0]["i64"]: [3 4 5]
+rec[1]["f64"]: [3 4 5]
+rec[2]["str"]: ["str-3" "str-4" "str-5"]
+rec[0]["i64"]: [6 7 8]
+rec[1]["f64"]: [6 7 8]
+rec[2]["str"]: ["str-6" "str-7" "str-8"]
+rec[0]["i64"]: [9]
+rec[1]["f64"]: [9]
+rec[2]["str"]: ["str-9"]
+`,
+ },
+ {
+ name: "chunk=6",
+ opts: []csv.Option{
+ csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithChunk(6),
+ },
+ records: 2,
+ want: `rec[0]["i64"]: [0 1 2 3 4 5]
+rec[1]["f64"]: [0 1 2 3 4 5]
+rec[2]["str"]: ["str-0" "str-1" "str-2" "str-3" "str-4" "str-5"]
+rec[0]["i64"]: [6 7 8 9]
+rec[1]["f64"]: [6 7 8 9]
+rec[2]["str"]: ["str-6" "str-7" "str-8" "str-9"]
+`,
+ },
+ {
+ name: "chunk=10",
+ opts: []csv.Option{
+ csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithChunk(10),
+ },
+ records: 1,
+ want: `rec[0]["i64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[1]["f64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[2]["str"]: ["str-0" "str-1" "str-2" "str-3" "str-4" "str-5" "str-6" "str-7" "str-8" "str-9"]
+`,
+ },
+ {
+ name: "chunk=11",
+ opts: []csv.Option{
+ csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithChunk(11),
+ },
+ records: 1,
+ want: `rec[0]["i64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[1]["f64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[2]["str"]: ["str-0" "str-1" "str-2" "str-3" "str-4" "str-5" "str-6" "str-7" "str-8" "str-9"]
+`,
+ },
+ {
+ name: "chunk=-1",
+ opts: []csv.Option{
+ csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithChunk(-1),
+ },
+ records: 1,
+ want: `rec[0]["i64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[1]["f64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[2]["str"]: ["str-0" "str-1" "str-2" "str-3" "str-4" "str-5" "str-6" "str-7" "str-8" "str-9"]
+`,
+ },
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ r := csv.NewReader(bytes.NewReader(raw), schema, tc.opts...)
+
+ defer r.Release()
+
+ r.Retain()
+ r.Release()
+
+ if got, want := r.Schema(), schema; !got.Equal(want) {
+ t.Fatalf("invalid schema: got=%v, want=%v", got, want)
+ }
+
+ out := new(bytes.Buffer)
+
+ n := 0
+ for r.Next() {
+ rec := r.Record()
+ for i, col := range rec.Columns() {
+ fmt.Fprintf(out, "rec[%d][%q]: %v\n", i, rec.ColumnName(i), col)
+ }
+ n++
+ }
+
+ if got, want := n, tc.records; got != want {
+ t.Fatalf("invalid number of records: got=%d, want=%d", got, want)
+ }
+
+ if got, want := out.String(), tc.want; got != want {
+ t.Fatalf("invalid output:\ngot:\n%s\nwant:\n%s\n", got, want)
+ }
+
+ if r.Err() != nil {
+ t.Fatalf("unexpected error: %v", r.Err())
+ }
+ })
+ }
+}