You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/02 17:36:00 UTC

[arrow] branch master updated: ARROW-3684: [Go] Add chunking ability to CSV reader

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new fd2f798  ARROW-3684: [Go] Add chunking ability to CSV reader
fd2f798 is described below

commit fd2f798adccaf1644aa32a9b9cf2873a8f540142
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Sun Dec 2 11:35:53 2018 -0600

    ARROW-3684: [Go] Add chunking ability to CSV reader
    
    Author: Sebastien Binet <bi...@cern.ch>
    
    Closes #3019 from sbinet/issue-3684 and squashes the following commits:
    
    64e88515e <Sebastien Binet> ARROW-3684:  Add chunking ability to CSV reader
---
 go/arrow/csv/csv.go      |  98 +++++++++++++++-
 go/arrow/csv/csv_test.go | 295 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 387 insertions(+), 6 deletions(-)

diff --git a/go/arrow/csv/csv.go b/go/arrow/csv/csv.go
index 36f3abd..79f2280 100644
--- a/go/arrow/csv/csv.go
+++ b/go/arrow/csv/csv.go
@@ -17,8 +17,6 @@
 // Package csv reads CSV files and presents the extracted data as records.
 package csv
 
-// TODO: implement a row chunker to accumulate N rows into the current record.
-
 import (
 	"encoding/csv"
 	"errors"
@@ -61,6 +59,19 @@ func WithAllocator(mem memory.Allocator) Option {
 	}
 }
 
+// WithChunk specifies the chunk size used while parsing CSV files.
+//
+// If n is zero or 1, no chunking will take place and the reader will create
+// one record per row.
+// If n is greater than 1, chunks of n rows will be read.
+// If n is negative, the reader will load the whole CSV file into memory and
+// create one big record with all the rows.
+func WithChunk(n int) Option {
+	return func(r *Reader) {
+		r.chunk = n
+	}
+}
+
 // Reader wraps encoding/csv.Reader and creates array.Records from a schema.
 type Reader struct {
 	r      *csv.Reader
@@ -71,6 +82,10 @@ type Reader struct {
 	cur  array.Record
 	err  error
 
+	chunk int
+	done  bool
+	next  func() bool
+
 	mem memory.Allocator
 }
 
@@ -82,7 +97,7 @@ type Reader struct {
 func NewReader(r io.Reader, schema *arrow.Schema, opts ...Option) *Reader {
 	validate(schema)
 
-	rr := &Reader{r: csv.NewReader(r), schema: schema, refs: 1}
+	rr := &Reader{r: csv.NewReader(r), schema: schema, refs: 1, chunk: 1}
 	for _, opt := range opts {
 		opt(rr)
 	}
@@ -93,6 +108,14 @@ func NewReader(r io.Reader, schema *arrow.Schema, opts ...Option) *Reader {
 
 	rr.bld = array.NewRecordBuilder(rr.mem, rr.schema)
 
+	switch {
+	case rr.chunk < 0:
+		rr.next = rr.nextall
+	case rr.chunk > 1:
+		rr.next = rr.nextn
+	default:
+		rr.next = rr.next1
+	}
 	return rr
 }
 
@@ -117,13 +140,20 @@ func (r *Reader) Next() bool {
 		r.cur = nil
 	}
 
-	if r.err != nil {
+	if r.err != nil || r.done {
 		return false
 	}
 
+	return r.next()
+}
+
+// next1 reads one row from the CSV file and creates a single Record
+// from that row.
+func (r *Reader) next1() bool {
 	var recs []string
 	recs, r.err = r.r.Read()
 	if r.err != nil {
+		r.done = true
 		if r.err == io.EOF {
 			r.err = nil
 		}
@@ -132,8 +162,65 @@ func (r *Reader) Next() bool {
 
 	r.validate(recs)
 	r.read(recs)
+	r.cur = r.bld.NewRecord()
 
-	return r.err == nil
+	return true
+}
+
+// nextall reads the whole CSV file into memory and creates one single
+// Record from all the CSV rows.
+func (r *Reader) nextall() bool {
+	defer func() {
+		r.done = true
+	}()
+
+	var (
+		recs [][]string
+	)
+
+	recs, r.err = r.r.ReadAll()
+	if r.err != nil {
+		return false
+	}
+
+	for _, rec := range recs {
+		r.validate(rec)
+		r.read(rec)
+	}
+	r.cur = r.bld.NewRecord()
+
+	return true
+}
+
+// nextn reads n rows from the CSV file, where n is the chunk size, and creates
+// a Record from these rows.
+func (r *Reader) nextn() bool {
+	var (
+		recs []string
+		n    = 0
+	)
+
+	for i := 0; i < r.chunk && !r.done; i++ {
+		recs, r.err = r.r.Read()
+		if r.err != nil {
+			r.done = true
+			break
+		}
+
+		r.validate(recs)
+		r.read(recs)
+		n++
+	}
+
+	if r.err != nil {
+		r.done = true
+		if r.err == io.EOF {
+			r.err = nil
+		}
+	}
+
+	r.cur = r.bld.NewRecord()
+	return n > 0
 }
 
 func (r *Reader) validate(recs []string) {
@@ -193,7 +280,6 @@ func (r *Reader) read(recs []string) {
 			r.bld.Field(i).(*array.StringBuilder).Append(str)
 		}
 	}
-	r.cur = r.bld.NewRecord()
 }
 
 func (r *Reader) readI8(str string) int8 {
diff --git a/go/arrow/csv/csv_test.go b/go/arrow/csv/csv_test.go
index 534e8ea..f53cf17 100644
--- a/go/arrow/csv/csv_test.go
+++ b/go/arrow/csv/csv_test.go
@@ -89,6 +89,52 @@ func Example() {
 	// rec[2]["str"]: ["str-9"]
 }
 
+func Example_withChunk() {
+	f, err := os.Open("testdata/simple.csv")
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer f.Close()
+
+	schema := arrow.NewSchema(
+		[]arrow.Field{
+			arrow.Field{Name: "i64", Type: arrow.PrimitiveTypes.Int64},
+			arrow.Field{Name: "f64", Type: arrow.PrimitiveTypes.Float64},
+			arrow.Field{Name: "str", Type: arrow.BinaryTypes.String},
+		},
+		nil,
+	)
+	r := csv.NewReader(
+		f, schema,
+		csv.WithComment('#'), csv.WithComma(';'),
+		csv.WithChunk(3),
+	)
+	defer r.Release()
+
+	n := 0
+	for r.Next() {
+		rec := r.Record()
+		for i, col := range rec.Columns() {
+			fmt.Printf("rec[%d][%q]: %v\n", i, rec.ColumnName(i), col)
+		}
+		n++
+	}
+
+	// Output:
+	// rec[0]["i64"]: [0 1 2]
+	// rec[1]["f64"]: [0 1 2]
+	// rec[2]["str"]: ["str-0" "str-1" "str-2"]
+	// rec[0]["i64"]: [3 4 5]
+	// rec[1]["f64"]: [3 4 5]
+	// rec[2]["str"]: ["str-3" "str-4" "str-5"]
+	// rec[0]["i64"]: [6 7 8]
+	// rec[1]["f64"]: [6 7 8]
+	// rec[2]["str"]: ["str-6" "str-7" "str-8"]
+	// rec[0]["i64"]: [9]
+	// rec[1]["f64"]: [9]
+	// rec[2]["str"]: ["str-9"]
+}
+
 func TestCSVReader(t *testing.T) {
 	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
 	defer mem.AssertSize(t, 0)
@@ -190,3 +236,252 @@ rec[11]["str"]: ["str-2"]
 		r.Release()
 	}
 }
+
+func TestCSVReaderWithChunk(t *testing.T) {
+	mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+	defer mem.AssertSize(t, 0)
+
+	raw, err := ioutil.ReadFile("testdata/simple.csv")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	schema := arrow.NewSchema(
+		[]arrow.Field{
+			arrow.Field{Name: "i64", Type: arrow.PrimitiveTypes.Int64},
+			arrow.Field{Name: "f64", Type: arrow.PrimitiveTypes.Float64},
+			arrow.Field{Name: "str", Type: arrow.BinaryTypes.String},
+		},
+		nil,
+	)
+
+	for _, tc := range []struct {
+		name    string
+		opts    []csv.Option
+		records int
+		want    string
+	}{
+		{
+			name:    "chunk=default",
+			opts:    []csv.Option{csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';')},
+			records: 10,
+			want: `rec[0]["i64"]: [0]
+rec[1]["f64"]: [0]
+rec[2]["str"]: ["str-0"]
+rec[0]["i64"]: [1]
+rec[1]["f64"]: [1]
+rec[2]["str"]: ["str-1"]
+rec[0]["i64"]: [2]
+rec[1]["f64"]: [2]
+rec[2]["str"]: ["str-2"]
+rec[0]["i64"]: [3]
+rec[1]["f64"]: [3]
+rec[2]["str"]: ["str-3"]
+rec[0]["i64"]: [4]
+rec[1]["f64"]: [4]
+rec[2]["str"]: ["str-4"]
+rec[0]["i64"]: [5]
+rec[1]["f64"]: [5]
+rec[2]["str"]: ["str-5"]
+rec[0]["i64"]: [6]
+rec[1]["f64"]: [6]
+rec[2]["str"]: ["str-6"]
+rec[0]["i64"]: [7]
+rec[1]["f64"]: [7]
+rec[2]["str"]: ["str-7"]
+rec[0]["i64"]: [8]
+rec[1]["f64"]: [8]
+rec[2]["str"]: ["str-8"]
+rec[0]["i64"]: [9]
+rec[1]["f64"]: [9]
+rec[2]["str"]: ["str-9"]
+`,
+		},
+		{
+			name: "chunk=0",
+			opts: []csv.Option{
+				csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+				csv.WithChunk(0),
+			},
+			records: 10,
+			want: `rec[0]["i64"]: [0]
+rec[1]["f64"]: [0]
+rec[2]["str"]: ["str-0"]
+rec[0]["i64"]: [1]
+rec[1]["f64"]: [1]
+rec[2]["str"]: ["str-1"]
+rec[0]["i64"]: [2]
+rec[1]["f64"]: [2]
+rec[2]["str"]: ["str-2"]
+rec[0]["i64"]: [3]
+rec[1]["f64"]: [3]
+rec[2]["str"]: ["str-3"]
+rec[0]["i64"]: [4]
+rec[1]["f64"]: [4]
+rec[2]["str"]: ["str-4"]
+rec[0]["i64"]: [5]
+rec[1]["f64"]: [5]
+rec[2]["str"]: ["str-5"]
+rec[0]["i64"]: [6]
+rec[1]["f64"]: [6]
+rec[2]["str"]: ["str-6"]
+rec[0]["i64"]: [7]
+rec[1]["f64"]: [7]
+rec[2]["str"]: ["str-7"]
+rec[0]["i64"]: [8]
+rec[1]["f64"]: [8]
+rec[2]["str"]: ["str-8"]
+rec[0]["i64"]: [9]
+rec[1]["f64"]: [9]
+rec[2]["str"]: ["str-9"]
+`,
+		},
+		{
+			name: "chunk=1",
+			opts: []csv.Option{
+				csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+				csv.WithChunk(1),
+			},
+			records: 10,
+			want: `rec[0]["i64"]: [0]
+rec[1]["f64"]: [0]
+rec[2]["str"]: ["str-0"]
+rec[0]["i64"]: [1]
+rec[1]["f64"]: [1]
+rec[2]["str"]: ["str-1"]
+rec[0]["i64"]: [2]
+rec[1]["f64"]: [2]
+rec[2]["str"]: ["str-2"]
+rec[0]["i64"]: [3]
+rec[1]["f64"]: [3]
+rec[2]["str"]: ["str-3"]
+rec[0]["i64"]: [4]
+rec[1]["f64"]: [4]
+rec[2]["str"]: ["str-4"]
+rec[0]["i64"]: [5]
+rec[1]["f64"]: [5]
+rec[2]["str"]: ["str-5"]
+rec[0]["i64"]: [6]
+rec[1]["f64"]: [6]
+rec[2]["str"]: ["str-6"]
+rec[0]["i64"]: [7]
+rec[1]["f64"]: [7]
+rec[2]["str"]: ["str-7"]
+rec[0]["i64"]: [8]
+rec[1]["f64"]: [8]
+rec[2]["str"]: ["str-8"]
+rec[0]["i64"]: [9]
+rec[1]["f64"]: [9]
+rec[2]["str"]: ["str-9"]
+`,
+		},
+		{
+			name: "chunk=3",
+			opts: []csv.Option{
+				csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+				csv.WithChunk(3),
+			},
+			records: 4,
+			want: `rec[0]["i64"]: [0 1 2]
+rec[1]["f64"]: [0 1 2]
+rec[2]["str"]: ["str-0" "str-1" "str-2"]
+rec[0]["i64"]: [3 4 5]
+rec[1]["f64"]: [3 4 5]
+rec[2]["str"]: ["str-3" "str-4" "str-5"]
+rec[0]["i64"]: [6 7 8]
+rec[1]["f64"]: [6 7 8]
+rec[2]["str"]: ["str-6" "str-7" "str-8"]
+rec[0]["i64"]: [9]
+rec[1]["f64"]: [9]
+rec[2]["str"]: ["str-9"]
+`,
+		},
+		{
+			name: "chunk=6",
+			opts: []csv.Option{
+				csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+				csv.WithChunk(6),
+			},
+			records: 2,
+			want: `rec[0]["i64"]: [0 1 2 3 4 5]
+rec[1]["f64"]: [0 1 2 3 4 5]
+rec[2]["str"]: ["str-0" "str-1" "str-2" "str-3" "str-4" "str-5"]
+rec[0]["i64"]: [6 7 8 9]
+rec[1]["f64"]: [6 7 8 9]
+rec[2]["str"]: ["str-6" "str-7" "str-8" "str-9"]
+`,
+		},
+		{
+			name: "chunk=10",
+			opts: []csv.Option{
+				csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+				csv.WithChunk(10),
+			},
+			records: 1,
+			want: `rec[0]["i64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[1]["f64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[2]["str"]: ["str-0" "str-1" "str-2" "str-3" "str-4" "str-5" "str-6" "str-7" "str-8" "str-9"]
+`,
+		},
+		{
+			name: "chunk=11",
+			opts: []csv.Option{
+				csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+				csv.WithChunk(11),
+			},
+			records: 1,
+			want: `rec[0]["i64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[1]["f64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[2]["str"]: ["str-0" "str-1" "str-2" "str-3" "str-4" "str-5" "str-6" "str-7" "str-8" "str-9"]
+`,
+		},
+		{
+			name: "chunk=-1",
+			opts: []csv.Option{
+				csv.WithAllocator(mem), csv.WithComment('#'), csv.WithComma(';'),
+				csv.WithChunk(-1),
+			},
+			records: 1,
+			want: `rec[0]["i64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[1]["f64"]: [0 1 2 3 4 5 6 7 8 9]
+rec[2]["str"]: ["str-0" "str-1" "str-2" "str-3" "str-4" "str-5" "str-6" "str-7" "str-8" "str-9"]
+`,
+		},
+	} {
+		t.Run(tc.name, func(t *testing.T) {
+			r := csv.NewReader(bytes.NewReader(raw), schema, tc.opts...)
+
+			defer r.Release()
+
+			r.Retain()
+			r.Release()
+
+			if got, want := r.Schema(), schema; !got.Equal(want) {
+				t.Fatalf("invalid schema: got=%v, want=%v", got, want)
+			}
+
+			out := new(bytes.Buffer)
+
+			n := 0
+			for r.Next() {
+				rec := r.Record()
+				for i, col := range rec.Columns() {
+					fmt.Fprintf(out, "rec[%d][%q]: %v\n", i, rec.ColumnName(i), col)
+				}
+				n++
+			}
+
+			if got, want := n, tc.records; got != want {
+				t.Fatalf("invalid number of records: got=%d, want=%d", got, want)
+			}
+
+			if got, want := out.String(), tc.want; got != want {
+				t.Fatalf("invalid output:\ngot:\n%s\nwant:\n%s\n", got, want)
+			}
+
+			if r.Err() != nil {
+				t.Fatalf("unexpected error: %v", r.Err())
+			}
+		})
+	}
+}