You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by sb...@apache.org on 2019/06/11 16:45:48 UTC

[arrow] branch master updated: ARROW-5551: [Go] implement FixedSizeArrays with 2-buffers layout

This is an automated email from the ASF dual-hosted git repository.

sbinet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 71bcfdf  ARROW-5551: [Go] implement FixedSizeArrays with 2-buffers layout
71bcfdf is described below

commit 71bcfdfd2bb4f82c9652012001ffed882e7b968d
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Tue Jun 11 18:45:23 2019 +0200

    ARROW-5551: [Go] implement FixedSizeArrays with 2-buffers layout
    
    Author: Sebastien Binet <bi...@cern.ch>
    
    Closes #4517 from sbinet/issue-5551 and squashes the following commits:
    
    47162f07a <Sebastien Binet> ARROW-5551:  implement FixedSizeArrays with 2-buffers layout
---
 go/arrow/array/array_test.go                   |  2 +-
 go/arrow/array/fixedsize_binary.go             | 45 +++++++----------------
 go/arrow/array/fixedsize_binary_test.go        |  8 +++--
 go/arrow/array/fixedsize_binarybuilder.go      | 50 +++++++-------------------
 go/arrow/array/fixedsize_binarybuilder_test.go |  9 -----
 go/arrow/internal/arrdata/arrdata.go           | 50 ++++++++++++++++++++++++++
 go/arrow/ipc/cmd/arrow-cat/main_test.go        | 32 +++++++++++++++++
 go/arrow/ipc/cmd/arrow-ls/main_test.go         | 26 ++++++++++++++
 go/arrow/ipc/file_reader.go                    |  2 +-
 go/arrow/ipc/writer.go                         | 28 ++++++++++++---
 10 files changed, 165 insertions(+), 87 deletions(-)

diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go
index 3a3407f..884bb8d 100644
--- a/go/arrow/array/array_test.go
+++ b/go/arrow/array/array_test.go
@@ -63,7 +63,7 @@ func TestMakeFromData(t *testing.T) {
 		{name: "timestamp", d: &testDataType{arrow.TIMESTAMP}},
 		{name: "time32", d: &testDataType{arrow.TIME32}},
 		{name: "time64", d: &testDataType{arrow.TIME64}},
-		{name: "fixed_size_binary", d: &testDataType{arrow.FIXED_SIZE_BINARY}, size: 3},
+		{name: "fixed_size_binary", d: &testDataType{arrow.FIXED_SIZE_BINARY}},
 
 		{name: "list", d: &testDataType{arrow.LIST}, child: []*array.Data{
 			array.NewData(&testDataType{arrow.INT64}, 0, make([]*memory.Buffer, 4), nil, 0, 0),
diff --git a/go/arrow/array/fixedsize_binary.go b/go/arrow/array/fixedsize_binary.go
index db24a86..5134302 100644
--- a/go/arrow/array/fixedsize_binary.go
+++ b/go/arrow/array/fixedsize_binary.go
@@ -26,13 +26,14 @@ import (
 // A type which represents an immutable sequence of fixed-length binary strings.
 type FixedSizeBinary struct {
 	array
-	valueOffsets []int32
-	valueBytes   []byte
+
+	valueBytes []byte
+	bytewidth  int32
 }
 
 // NewFixedSizeBinaryData constructs a new fixed-size binary array from data.
 func NewFixedSizeBinaryData(data *Data) *FixedSizeBinary {
-	a := &FixedSizeBinary{}
+	a := &FixedSizeBinary{bytewidth: int32(data.DataType().(arrow.FixedWidthDataType).BitWidth() / 8)}
 	a.refCount = 1
 	a.setData(data)
 	return a
@@ -41,14 +42,14 @@ func NewFixedSizeBinaryData(data *Data) *FixedSizeBinary {
 // Value returns the fixed-size slice at index i. This value should not be mutated.
 func (a *FixedSizeBinary) Value(i int) []byte {
 	i += a.array.data.offset
-	return a.valueBytes[a.valueOffsets[i]:a.valueOffsets[i+1]]
+	var (
+		bw  = int(a.bytewidth)
+		beg = i * bw
+		end = (i + 1) * bw
+	)
+	return a.valueBytes[beg:end]
 }
 
-func (a *FixedSizeBinary) ValueOffset(i int) int { return int(a.valueOffsets[i]) }
-func (a *FixedSizeBinary) ValueLen(i int) int    { return int(a.valueOffsets[i+1] - a.valueOffsets[i]) }
-func (a *FixedSizeBinary) ValueOffsets() []int32 { return a.valueOffsets }
-func (a *FixedSizeBinary) ValueBytes() []byte    { return a.valueBytes }
-
 func (a *FixedSizeBinary) String() string {
 	o := new(strings.Builder)
 	o.WriteString("[")
@@ -68,32 +69,12 @@ func (a *FixedSizeBinary) String() string {
 }
 
 func (a *FixedSizeBinary) setData(data *Data) {
-	if len(data.buffers) != 3 {
-		panic("len(data.buffers) != 3")
-	}
-
 	a.array.setData(data)
-
-	if valueBytes := data.buffers[2]; valueBytes != nil {
-		a.valueBytes = valueBytes.Bytes()
+	vals := data.buffers[1]
+	if vals != nil {
+		a.valueBytes = vals.Bytes()
 	}
 
-	switch valueOffsets := data.buffers[1]; valueOffsets {
-	case nil:
-		// re-compute offsets
-		offsets := make([]int32, a.Len()+1)
-		bw := a.DataType().(arrow.FixedWidthDataType).BitWidth() / 8
-		for i := range offsets[1:] {
-			var delta int32
-			if a.IsValid(i) {
-				delta = int32(bw)
-			}
-			offsets[i+1] = offsets[i] + delta
-		}
-		a.valueOffsets = offsets
-	default:
-		a.valueOffsets = arrow.Int32Traits.CastFromBytes(valueOffsets.Bytes())
-	}
 }
 
 var (
diff --git a/go/arrow/array/fixedsize_binary_test.go b/go/arrow/array/fixedsize_binary_test.go
index fdb0fbf..4d2d724 100644
--- a/go/arrow/array/fixedsize_binary_test.go
+++ b/go/arrow/array/fixedsize_binary_test.go
@@ -33,6 +33,8 @@ func TestFixedSizeBinary(t *testing.T) {
 	dtype := arrow.FixedSizeBinaryType{ByteWidth: 7}
 	b := array.NewFixedSizeBinaryBuilder(mem, &dtype)
 
+	zero := make([]byte, dtype.ByteWidth)
+
 	values := [][]byte{
 		[]byte("7654321"),
 		nil,
@@ -48,7 +50,9 @@ func TestFixedSizeBinary(t *testing.T) {
 	assert.Equal(t, 3, a.Len())
 	assert.Equal(t, 1, a.NullN())
 	assert.Equal(t, []byte("7654321"), a.Value(0))
-	assert.Equal(t, []byte{}, a.Value(1))
+	assert.Equal(t, zero, a.Value(1))
+	assert.Equal(t, true, a.IsNull(1))
+	assert.Equal(t, false, a.IsValid(1))
 	assert.Equal(t, []byte("AZERTYU"), a.Value(2))
 	a.Release()
 
@@ -58,7 +62,7 @@ func TestFixedSizeBinary(t *testing.T) {
 	assert.Equal(t, 3, a.Len())
 	assert.Equal(t, 1, a.NullN())
 	assert.Equal(t, []byte("7654321"), a.Value(0))
-	assert.Equal(t, []byte{}, a.Value(1))
+	assert.Equal(t, zero, a.Value(1))
 	assert.Equal(t, []byte("AZERTYU"), a.Value(2))
 	a.Release()
 
diff --git a/go/arrow/array/fixedsize_binarybuilder.go b/go/arrow/array/fixedsize_binarybuilder.go
index 053a192..8a2f65f 100644
--- a/go/arrow/array/fixedsize_binarybuilder.go
+++ b/go/arrow/array/fixedsize_binarybuilder.go
@@ -17,6 +17,7 @@
 package array
 
 import (
+	"fmt"
 	"sync/atomic"
 
 	"github.com/apache/arrow/go/arrow"
@@ -28,16 +29,14 @@ import (
 type FixedSizeBinaryBuilder struct {
 	builder
 
-	dtype   *arrow.FixedSizeBinaryType
-	offsets *int32BufferBuilder
-	values  *byteBufferBuilder
+	dtype  *arrow.FixedSizeBinaryType
+	values *byteBufferBuilder
 }
 
 func NewFixedSizeBinaryBuilder(mem memory.Allocator, dtype *arrow.FixedSizeBinaryType) *FixedSizeBinaryBuilder {
 	b := &FixedSizeBinaryBuilder{
 		builder: builder{refCount: 1, mem: mem},
 		dtype:   dtype,
-		offsets: newInt32BufferBuilder(mem),
 		values:  newByteBufferBuilder(mem),
 	}
 	return b
@@ -54,10 +53,6 @@ func (b *FixedSizeBinaryBuilder) Release() {
 			b.nullBitmap.Release()
 			b.nullBitmap = nil
 		}
-		if b.offsets != nil {
-			b.offsets.Release()
-			b.offsets = nil
-		}
 		if b.values != nil {
 			b.values.Release()
 			b.values = nil
@@ -72,14 +67,13 @@ func (b *FixedSizeBinaryBuilder) Append(v []byte) {
 	}
 
 	b.Reserve(1)
-	b.appendNextOffset()
 	b.values.Append(v)
 	b.UnsafeAppendBoolToBitmap(true)
 }
 
 func (b *FixedSizeBinaryBuilder) AppendNull() {
 	b.Reserve(1)
-	b.appendNextOffset()
+	b.values.Advance(b.dtype.ByteWidth)
 	b.UnsafeAppendBoolToBitmap(false)
 }
 
@@ -97,25 +91,19 @@ func (b *FixedSizeBinaryBuilder) AppendValues(v [][]byte, valid []bool) {
 
 	b.Reserve(len(v))
 	for _, vv := range v {
-		b.appendNextOffset()
-		b.values.Append(vv)
+		switch len(vv) {
+		case 0:
+			b.values.Advance(b.dtype.ByteWidth)
+		case b.dtype.ByteWidth:
+			b.values.Append(vv)
+		default:
+			panic(fmt.Errorf("array: invalid binary length (got=%d, want=%d)", len(vv), b.dtype.ByteWidth))
+		}
 	}
 
 	b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
 }
 
-func (b *FixedSizeBinaryBuilder) Value(i int) []byte {
-	offsets := b.offsets.Values()
-	start := int(offsets[i])
-	var end int
-	if i == (b.length - 1) {
-		end = b.values.Len()
-	} else {
-		end = int(offsets[i+1])
-	}
-	return b.values.Bytes()[start:end]
-}
-
 func (b *FixedSizeBinaryBuilder) init(capacity int) {
 	b.builder.init(capacity)
 	b.values.resize(capacity * b.dtype.ByteWidth)
@@ -130,7 +118,6 @@ func (b *FixedSizeBinaryBuilder) Reserve(n int) {
 // Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(),
 // additional memory will be allocated. If n is smaller, the allocated memory may reduced.
 func (b *FixedSizeBinaryBuilder) Resize(n int) {
-	b.offsets.resize((n + 1) * arrow.Int32SizeBytes)
 	b.builder.resize(n, b.init)
 }
 
@@ -150,29 +137,18 @@ func (b *FixedSizeBinaryBuilder) NewFixedSizeBinaryArray() (a *FixedSizeBinary)
 }
 
 func (b *FixedSizeBinaryBuilder) newData() (data *Data) {
-	b.appendNextOffset()
 	values := b.values.Finish()
-	offsets := b.offsets.Finish()
-	data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, offsets, values}, nil, b.nulls, 0)
+	data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, values}, nil, b.nulls, 0)
 
 	if values != nil {
 		values.Release()
 	}
-	if offsets != nil {
-		offsets.Release()
-	}
 
 	b.builder.reset()
 
 	return
 }
 
-func (b *FixedSizeBinaryBuilder) appendNextOffset() {
-	numBytes := b.values.Len()
-	// TODO(alexandre): check binaryArrayMaximumCapacity?
-	b.offsets.AppendValue(int32(numBytes))
-}
-
 var (
 	_ Builder = (*FixedSizeBinaryBuilder)(nil)
 )
diff --git a/go/arrow/array/fixedsize_binarybuilder_test.go b/go/arrow/array/fixedsize_binarybuilder_test.go
index f50e1b0..08740c5 100644
--- a/go/arrow/array/fixedsize_binarybuilder_test.go
+++ b/go/arrow/array/fixedsize_binarybuilder_test.go
@@ -39,11 +39,6 @@ func TestFixedSizeBinaryBuilder(t *testing.T) {
 	assert.Equal(t, 4, b.Len(), "unexpected Len()")
 	assert.Equal(t, 2, b.NullN(), "unexpected NullN()")
 
-	assert.Equal(t, b.Value(0), []byte("1234567"))
-	assert.Equal(t, b.Value(1), []byte{})
-	assert.Equal(t, b.Value(2), []byte("ABCDEFG"))
-	assert.Equal(t, b.Value(3), []byte{})
-
 	values := [][]byte{
 		[]byte("7654321"),
 		nil,
@@ -54,10 +49,6 @@ func TestFixedSizeBinaryBuilder(t *testing.T) {
 	assert.Equal(t, 7, b.Len(), "unexpected Len()")
 	assert.Equal(t, 3, b.NullN(), "unexpected NullN()")
 
-	assert.Equal(t, []byte("7654321"), b.Value(4))
-	assert.Equal(t, []byte{}, b.Value(5))
-	assert.Equal(t, []byte("AZERTYU"), b.Value(6))
-
 	a := b.NewFixedSizeBinaryArray()
 
 	// check state of builder after NewFixedSizeBinaryArray
diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go
index 504ade6..b7daf6f 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -39,6 +39,7 @@ func init() {
 	Records["strings"] = makeStringsRecords()
 	Records["fixed_size_lists"] = makeFixedSizeListsRecords()
 	Records["fixed_width_types"] = makeFixedWidthTypesRecords()
+	Records["fixed_size_binaries"] = makeFixedSizeBinariesRecords()
 
 	for k := range Records {
 		RecordNames = append(RecordNames, k)
@@ -398,6 +399,45 @@ func makeFixedWidthTypesRecords() []array.Record {
 	return recs
 }
 
+type fsb3 string
+
+func makeFixedSizeBinariesRecords() []array.Record {
+	mem := memory.NewGoAllocator()
+	schema := arrow.NewSchema(
+		[]arrow.Field{
+			arrow.Field{Name: "fixed_size_binary_3", Type: &arrow.FixedSizeBinaryType{ByteWidth: 3}, Nullable: true},
+		}, nil,
+	)
+
+	mask := []bool{true, false, false, true, true}
+	chunks := [][]array.Interface{
+		[]array.Interface{
+			arrayOf(mem, []fsb3{"001", "002", "003", "004", "005"}, mask),
+		},
+		[]array.Interface{
+			arrayOf(mem, []fsb3{"011", "012", "013", "014", "015"}, mask),
+		},
+		[]array.Interface{
+			arrayOf(mem, []fsb3{"021", "022", "023", "024", "025"}, mask),
+		},
+	}
+
+	defer func() {
+		for _, chunk := range chunks {
+			for _, col := range chunk {
+				col.Release()
+			}
+		}
+	}()
+
+	recs := make([]array.Record, len(chunks))
+	for i, chunk := range chunks {
+		recs[i] = array.NewRecord(schema, chunk, -1)
+	}
+
+	return recs
+}
+
 func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface {
 	if mem == nil {
 		mem = memory.NewGoAllocator()
@@ -567,6 +607,16 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface
 		bldr.AppendValues(a, valids)
 		return bldr.NewArray()
 
+	case []fsb3:
+		bldr := array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: 3})
+		defer bldr.Release()
+		vs := make([][]byte, len(a))
+		for i, v := range a {
+			vs[i] = []byte(v)
+		}
+		bldr.AppendValues(vs, valids)
+		return bldr.NewArray()
+
 	default:
 		panic(fmt.Errorf("arrdata: invalid data slice type %T", a))
 	}
diff --git a/go/arrow/ipc/cmd/arrow-cat/main_test.go b/go/arrow/ipc/cmd/arrow-cat/main_test.go
index 5a8f031..3f9c3e7 100644
--- a/go/arrow/ipc/cmd/arrow-cat/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-cat/main_test.go
@@ -149,6 +149,16 @@ record 3...
   col[7] "date64s": [-22 (null) (null) 21 22]
 `,
 		},
+		{
+			name: "fixed_size_binaries",
+			want: `record 1...
+  col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"]
+record 2...
+  col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"]
+record 3...
+  col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"]
+`,
+		},
 	} {
 		t.Run(tc.name, func(t *testing.T) {
 			mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -450,6 +460,28 @@ record 3/3...
   col[7] "date64s": [-22 (null) (null) 21 22]
 `,
 		},
+		{
+			stream: true,
+			name:   "fixed_size_binaries",
+			want: `record 1...
+  col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"]
+record 2...
+  col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"]
+record 3...
+  col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"]
+`,
+		},
+		{
+			name: "fixed_size_binaries",
+			want: `version: V4
+record 1/3...
+  col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"]
+record 2/3...
+  col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"]
+record 3/3...
+  col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"]
+`,
+		},
 	} {
 		t.Run(fmt.Sprintf("%s-stream=%v", tc.name, tc.stream), func(t *testing.T) {
 			mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
diff --git a/go/arrow/ipc/cmd/arrow-ls/main_test.go b/go/arrow/ipc/cmd/arrow-ls/main_test.go
index 64cd478..0488eae 100644
--- a/go/arrow/ipc/cmd/arrow-ls/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-ls/main_test.go
@@ -102,6 +102,14 @@ records: 3
 records: 3
 `,
 		},
+		{
+			name: "fixed_size_binaries",
+			want: `schema:
+  fields: 1
+    - fixed_size_binary_3: type=fixed_size_binary[3], nullable
+records: 3
+`,
+		},
 	} {
 		t.Run(tc.name, func(t *testing.T) {
 			mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -239,6 +247,24 @@ schema:
 records: 4
 `,
 		},
+		{
+			stream: true,
+			name:   "fixed_size_binaries",
+			want: `schema:
+  fields: 1
+    - fixed_size_binary_3: type=fixed_size_binary[3], nullable
+records: 3
+`,
+		},
+		{
+			name: "fixed_size_binaries",
+			want: `version: V4
+schema:
+  fields: 1
+    - fixed_size_binary_3: type=fixed_size_binary[3], nullable
+records: 3
+`,
+		},
 	} {
 		t.Run(fmt.Sprintf("%s-stream=%v", tc.name, tc.stream), func(t *testing.T) {
 			mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index b8f08cc..5638f8b 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -450,7 +450,7 @@ func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface {
 
 func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) array.Interface {
 	field, buffers := ctx.loadCommon(2)
-	buffers = append(buffers, nil, ctx.buffer())
+	buffers = append(buffers, ctx.buffer())
 
 	data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
 	defer data.Release()
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index e252023..e5797ba 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -237,12 +237,19 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
 	case arrow.FixedWidthDataType:
 		data := arr.Data()
 		values := data.Buffers()[1]
-		typeWidth := dtype.BitWidth() / 8
-		minLength := paddedLength(int64(arr.Len())*int64(typeWidth), kArrowAlignment)
+		arrLen := int64(arr.Len())
+		typeWidth := int64(dtype.BitWidth() / 8)
+		minLength := paddedLength(arrLen*typeWidth, kArrowAlignment)
 
 		switch {
 		case needTruncate(int64(data.Offset()), values, minLength):
-			panic("not implemented") // FIXME(sbinet) writer.cc:212
+			// non-zero offset: slice the buffer
+			offset := int64(data.Offset()) * typeWidth
+			// send padding if available
+			len := minI64(bitutil.CeilByte64(arrLen*typeWidth), int64(data.Len())-offset)
+			data = array.NewSliceData(data, offset, offset+len)
+			defer data.Release()
+			values = data.Buffers()[1]
 		default:
 			if values != nil {
 				values.Retain()
@@ -268,7 +275,9 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
 		case needTruncate(int64(data.Offset()), values, totalDataBytes):
 			panic("not implemented") // FIXME(sbinet) writer.cc:264
 		default:
-			values.Retain()
+			if values != nil {
+				values.Retain()
+			}
 		}
 		p.body = append(p.body, voffsets)
 		p.body = append(p.body, values)
@@ -291,7 +300,9 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
 		case needTruncate(int64(data.Offset()), values, totalDataBytes):
 			panic("not implemented") // FIXME(sbinet) writer.cc:264
 		default:
-			values.Retain()
+			if values != nil {
+				values.Retain()
+			}
 		}
 		p.body = append(p.body, voffsets)
 		p.body = append(p.body, values)
@@ -430,3 +441,10 @@ func needTruncate(offset int64, buf *memory.Buffer, minLength int64) bool {
 	}
 	return offset != 0 || minLength < int64(buf.Len())
 }
+
+func minI64(a, b int64) int64 {
+	if a < b {
+		return a
+	}
+	return b
+}