You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by sb...@apache.org on 2019/06/11 16:45:48 UTC
[arrow] branch master updated: ARROW-5551: [Go] implement
FixedSizeArrays with 2-buffers layout
This is an automated email from the ASF dual-hosted git repository.
sbinet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 71bcfdf ARROW-5551: [Go] implement FixedSizeArrays with 2-buffers layout
71bcfdf is described below
commit 71bcfdfd2bb4f82c9652012001ffed882e7b968d
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Tue Jun 11 18:45:23 2019 +0200
ARROW-5551: [Go] implement FixedSizeArrays with 2-buffers layout
Author: Sebastien Binet <bi...@cern.ch>
Closes #4517 from sbinet/issue-5551 and squashes the following commits:
47162f07a <Sebastien Binet> ARROW-5551: implement FixedSizeArrays with 2-buffers layout
---
go/arrow/array/array_test.go | 2 +-
go/arrow/array/fixedsize_binary.go | 45 +++++++----------------
go/arrow/array/fixedsize_binary_test.go | 8 +++--
go/arrow/array/fixedsize_binarybuilder.go | 50 +++++++-------------------
go/arrow/array/fixedsize_binarybuilder_test.go | 9 -----
go/arrow/internal/arrdata/arrdata.go | 50 ++++++++++++++++++++++++++
go/arrow/ipc/cmd/arrow-cat/main_test.go | 32 +++++++++++++++++
go/arrow/ipc/cmd/arrow-ls/main_test.go | 26 ++++++++++++++
go/arrow/ipc/file_reader.go | 2 +-
go/arrow/ipc/writer.go | 28 ++++++++++++---
10 files changed, 165 insertions(+), 87 deletions(-)
diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go
index 3a3407f..884bb8d 100644
--- a/go/arrow/array/array_test.go
+++ b/go/arrow/array/array_test.go
@@ -63,7 +63,7 @@ func TestMakeFromData(t *testing.T) {
{name: "timestamp", d: &testDataType{arrow.TIMESTAMP}},
{name: "time32", d: &testDataType{arrow.TIME32}},
{name: "time64", d: &testDataType{arrow.TIME64}},
- {name: "fixed_size_binary", d: &testDataType{arrow.FIXED_SIZE_BINARY}, size: 3},
+ {name: "fixed_size_binary", d: &testDataType{arrow.FIXED_SIZE_BINARY}},
{name: "list", d: &testDataType{arrow.LIST}, child: []*array.Data{
array.NewData(&testDataType{arrow.INT64}, 0, make([]*memory.Buffer, 4), nil, 0, 0),
diff --git a/go/arrow/array/fixedsize_binary.go b/go/arrow/array/fixedsize_binary.go
index db24a86..5134302 100644
--- a/go/arrow/array/fixedsize_binary.go
+++ b/go/arrow/array/fixedsize_binary.go
@@ -26,13 +26,14 @@ import (
// A type which represents an immutable sequence of fixed-length binary strings.
type FixedSizeBinary struct {
array
- valueOffsets []int32
- valueBytes []byte
+
+ valueBytes []byte
+ bytewidth int32
}
// NewFixedSizeBinaryData constructs a new fixed-size binary array from data.
func NewFixedSizeBinaryData(data *Data) *FixedSizeBinary {
- a := &FixedSizeBinary{}
+ a := &FixedSizeBinary{bytewidth: int32(data.DataType().(arrow.FixedWidthDataType).BitWidth() / 8)}
a.refCount = 1
a.setData(data)
return a
@@ -41,14 +42,14 @@ func NewFixedSizeBinaryData(data *Data) *FixedSizeBinary {
// Value returns the fixed-size slice at index i. This value should not be mutated.
func (a *FixedSizeBinary) Value(i int) []byte {
i += a.array.data.offset
- return a.valueBytes[a.valueOffsets[i]:a.valueOffsets[i+1]]
+ var (
+ bw = int(a.bytewidth)
+ beg = i * bw
+ end = (i + 1) * bw
+ )
+ return a.valueBytes[beg:end]
}
-func (a *FixedSizeBinary) ValueOffset(i int) int { return int(a.valueOffsets[i]) }
-func (a *FixedSizeBinary) ValueLen(i int) int { return int(a.valueOffsets[i+1] - a.valueOffsets[i]) }
-func (a *FixedSizeBinary) ValueOffsets() []int32 { return a.valueOffsets }
-func (a *FixedSizeBinary) ValueBytes() []byte { return a.valueBytes }
-
func (a *FixedSizeBinary) String() string {
o := new(strings.Builder)
o.WriteString("[")
@@ -68,32 +69,12 @@ func (a *FixedSizeBinary) String() string {
}
func (a *FixedSizeBinary) setData(data *Data) {
- if len(data.buffers) != 3 {
- panic("len(data.buffers) != 3")
- }
-
a.array.setData(data)
-
- if valueBytes := data.buffers[2]; valueBytes != nil {
- a.valueBytes = valueBytes.Bytes()
+ vals := data.buffers[1]
+ if vals != nil {
+ a.valueBytes = vals.Bytes()
}
- switch valueOffsets := data.buffers[1]; valueOffsets {
- case nil:
- // re-compute offsets
- offsets := make([]int32, a.Len()+1)
- bw := a.DataType().(arrow.FixedWidthDataType).BitWidth() / 8
- for i := range offsets[1:] {
- var delta int32
- if a.IsValid(i) {
- delta = int32(bw)
- }
- offsets[i+1] = offsets[i] + delta
- }
- a.valueOffsets = offsets
- default:
- a.valueOffsets = arrow.Int32Traits.CastFromBytes(valueOffsets.Bytes())
- }
}
var (
diff --git a/go/arrow/array/fixedsize_binary_test.go b/go/arrow/array/fixedsize_binary_test.go
index fdb0fbf..4d2d724 100644
--- a/go/arrow/array/fixedsize_binary_test.go
+++ b/go/arrow/array/fixedsize_binary_test.go
@@ -33,6 +33,8 @@ func TestFixedSizeBinary(t *testing.T) {
dtype := arrow.FixedSizeBinaryType{ByteWidth: 7}
b := array.NewFixedSizeBinaryBuilder(mem, &dtype)
+ zero := make([]byte, dtype.ByteWidth)
+
values := [][]byte{
[]byte("7654321"),
nil,
@@ -48,7 +50,9 @@ func TestFixedSizeBinary(t *testing.T) {
assert.Equal(t, 3, a.Len())
assert.Equal(t, 1, a.NullN())
assert.Equal(t, []byte("7654321"), a.Value(0))
- assert.Equal(t, []byte{}, a.Value(1))
+ assert.Equal(t, zero, a.Value(1))
+ assert.Equal(t, true, a.IsNull(1))
+ assert.Equal(t, false, a.IsValid(1))
assert.Equal(t, []byte("AZERTYU"), a.Value(2))
a.Release()
@@ -58,7 +62,7 @@ func TestFixedSizeBinary(t *testing.T) {
assert.Equal(t, 3, a.Len())
assert.Equal(t, 1, a.NullN())
assert.Equal(t, []byte("7654321"), a.Value(0))
- assert.Equal(t, []byte{}, a.Value(1))
+ assert.Equal(t, zero, a.Value(1))
assert.Equal(t, []byte("AZERTYU"), a.Value(2))
a.Release()
diff --git a/go/arrow/array/fixedsize_binarybuilder.go b/go/arrow/array/fixedsize_binarybuilder.go
index 053a192..8a2f65f 100644
--- a/go/arrow/array/fixedsize_binarybuilder.go
+++ b/go/arrow/array/fixedsize_binarybuilder.go
@@ -17,6 +17,7 @@
package array
import (
+ "fmt"
"sync/atomic"
"github.com/apache/arrow/go/arrow"
@@ -28,16 +29,14 @@ import (
type FixedSizeBinaryBuilder struct {
builder
- dtype *arrow.FixedSizeBinaryType
- offsets *int32BufferBuilder
- values *byteBufferBuilder
+ dtype *arrow.FixedSizeBinaryType
+ values *byteBufferBuilder
}
func NewFixedSizeBinaryBuilder(mem memory.Allocator, dtype *arrow.FixedSizeBinaryType) *FixedSizeBinaryBuilder {
b := &FixedSizeBinaryBuilder{
builder: builder{refCount: 1, mem: mem},
dtype: dtype,
- offsets: newInt32BufferBuilder(mem),
values: newByteBufferBuilder(mem),
}
return b
@@ -54,10 +53,6 @@ func (b *FixedSizeBinaryBuilder) Release() {
b.nullBitmap.Release()
b.nullBitmap = nil
}
- if b.offsets != nil {
- b.offsets.Release()
- b.offsets = nil
- }
if b.values != nil {
b.values.Release()
b.values = nil
@@ -72,14 +67,13 @@ func (b *FixedSizeBinaryBuilder) Append(v []byte) {
}
b.Reserve(1)
- b.appendNextOffset()
b.values.Append(v)
b.UnsafeAppendBoolToBitmap(true)
}
func (b *FixedSizeBinaryBuilder) AppendNull() {
b.Reserve(1)
- b.appendNextOffset()
+ b.values.Advance(b.dtype.ByteWidth)
b.UnsafeAppendBoolToBitmap(false)
}
@@ -97,25 +91,19 @@ func (b *FixedSizeBinaryBuilder) AppendValues(v [][]byte, valid []bool) {
b.Reserve(len(v))
for _, vv := range v {
- b.appendNextOffset()
- b.values.Append(vv)
+ switch len(vv) {
+ case 0:
+ b.values.Advance(b.dtype.ByteWidth)
+ case b.dtype.ByteWidth:
+ b.values.Append(vv)
+ default:
+ panic(fmt.Errorf("array: invalid binary length (got=%d, want=%d)", len(vv), b.dtype.ByteWidth))
+ }
}
b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
}
-func (b *FixedSizeBinaryBuilder) Value(i int) []byte {
- offsets := b.offsets.Values()
- start := int(offsets[i])
- var end int
- if i == (b.length - 1) {
- end = b.values.Len()
- } else {
- end = int(offsets[i+1])
- }
- return b.values.Bytes()[start:end]
-}
-
func (b *FixedSizeBinaryBuilder) init(capacity int) {
b.builder.init(capacity)
b.values.resize(capacity * b.dtype.ByteWidth)
@@ -130,7 +118,6 @@ func (b *FixedSizeBinaryBuilder) Reserve(n int) {
// Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(),
// additional memory will be allocated. If n is smaller, the allocated memory may reduced.
func (b *FixedSizeBinaryBuilder) Resize(n int) {
- b.offsets.resize((n + 1) * arrow.Int32SizeBytes)
b.builder.resize(n, b.init)
}
@@ -150,29 +137,18 @@ func (b *FixedSizeBinaryBuilder) NewFixedSizeBinaryArray() (a *FixedSizeBinary)
}
func (b *FixedSizeBinaryBuilder) newData() (data *Data) {
- b.appendNextOffset()
values := b.values.Finish()
- offsets := b.offsets.Finish()
- data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, offsets, values}, nil, b.nulls, 0)
+ data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, values}, nil, b.nulls, 0)
if values != nil {
values.Release()
}
- if offsets != nil {
- offsets.Release()
- }
b.builder.reset()
return
}
-func (b *FixedSizeBinaryBuilder) appendNextOffset() {
- numBytes := b.values.Len()
- // TODO(alexandre): check binaryArrayMaximumCapacity?
- b.offsets.AppendValue(int32(numBytes))
-}
-
var (
_ Builder = (*FixedSizeBinaryBuilder)(nil)
)
diff --git a/go/arrow/array/fixedsize_binarybuilder_test.go b/go/arrow/array/fixedsize_binarybuilder_test.go
index f50e1b0..08740c5 100644
--- a/go/arrow/array/fixedsize_binarybuilder_test.go
+++ b/go/arrow/array/fixedsize_binarybuilder_test.go
@@ -39,11 +39,6 @@ func TestFixedSizeBinaryBuilder(t *testing.T) {
assert.Equal(t, 4, b.Len(), "unexpected Len()")
assert.Equal(t, 2, b.NullN(), "unexpected NullN()")
- assert.Equal(t, b.Value(0), []byte("1234567"))
- assert.Equal(t, b.Value(1), []byte{})
- assert.Equal(t, b.Value(2), []byte("ABCDEFG"))
- assert.Equal(t, b.Value(3), []byte{})
-
values := [][]byte{
[]byte("7654321"),
nil,
@@ -54,10 +49,6 @@ func TestFixedSizeBinaryBuilder(t *testing.T) {
assert.Equal(t, 7, b.Len(), "unexpected Len()")
assert.Equal(t, 3, b.NullN(), "unexpected NullN()")
- assert.Equal(t, []byte("7654321"), b.Value(4))
- assert.Equal(t, []byte{}, b.Value(5))
- assert.Equal(t, []byte("AZERTYU"), b.Value(6))
-
a := b.NewFixedSizeBinaryArray()
// check state of builder after NewFixedSizeBinaryArray
diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go
index 504ade6..b7daf6f 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -39,6 +39,7 @@ func init() {
Records["strings"] = makeStringsRecords()
Records["fixed_size_lists"] = makeFixedSizeListsRecords()
Records["fixed_width_types"] = makeFixedWidthTypesRecords()
+ Records["fixed_size_binaries"] = makeFixedSizeBinariesRecords()
for k := range Records {
RecordNames = append(RecordNames, k)
@@ -398,6 +399,45 @@ func makeFixedWidthTypesRecords() []array.Record {
return recs
}
+type fsb3 string
+
+func makeFixedSizeBinariesRecords() []array.Record {
+ mem := memory.NewGoAllocator()
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ arrow.Field{Name: "fixed_size_binary_3", Type: &arrow.FixedSizeBinaryType{ByteWidth: 3}, Nullable: true},
+ }, nil,
+ )
+
+ mask := []bool{true, false, false, true, true}
+ chunks := [][]array.Interface{
+ []array.Interface{
+ arrayOf(mem, []fsb3{"001", "002", "003", "004", "005"}, mask),
+ },
+ []array.Interface{
+ arrayOf(mem, []fsb3{"011", "012", "013", "014", "015"}, mask),
+ },
+ []array.Interface{
+ arrayOf(mem, []fsb3{"021", "022", "023", "024", "025"}, mask),
+ },
+ }
+
+ defer func() {
+ for _, chunk := range chunks {
+ for _, col := range chunk {
+ col.Release()
+ }
+ }
+ }()
+
+ recs := make([]array.Record, len(chunks))
+ for i, chunk := range chunks {
+ recs[i] = array.NewRecord(schema, chunk, -1)
+ }
+
+ return recs
+}
+
func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface {
if mem == nil {
mem = memory.NewGoAllocator()
@@ -567,6 +607,16 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface
bldr.AppendValues(a, valids)
return bldr.NewArray()
+ case []fsb3:
+ bldr := array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: 3})
+ defer bldr.Release()
+ vs := make([][]byte, len(a))
+ for i, v := range a {
+ vs[i] = []byte(v)
+ }
+ bldr.AppendValues(vs, valids)
+ return bldr.NewArray()
+
default:
panic(fmt.Errorf("arrdata: invalid data slice type %T", a))
}
diff --git a/go/arrow/ipc/cmd/arrow-cat/main_test.go b/go/arrow/ipc/cmd/arrow-cat/main_test.go
index 5a8f031..3f9c3e7 100644
--- a/go/arrow/ipc/cmd/arrow-cat/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-cat/main_test.go
@@ -149,6 +149,16 @@ record 3...
col[7] "date64s": [-22 (null) (null) 21 22]
`,
},
+ {
+ name: "fixed_size_binaries",
+ want: `record 1...
+ col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"]
+record 2...
+ col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"]
+record 3...
+ col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"]
+`,
+ },
} {
t.Run(tc.name, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -450,6 +460,28 @@ record 3/3...
col[7] "date64s": [-22 (null) (null) 21 22]
`,
},
+ {
+ stream: true,
+ name: "fixed_size_binaries",
+ want: `record 1...
+ col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"]
+record 2...
+ col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"]
+record 3...
+ col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"]
+`,
+ },
+ {
+ name: "fixed_size_binaries",
+ want: `version: V4
+record 1/3...
+ col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"]
+record 2/3...
+ col[0] "fixed_size_binary_3": ["011" (null) (null) "014" "015"]
+record 3/3...
+ col[0] "fixed_size_binary_3": ["021" (null) (null) "024" "025"]
+`,
+ },
} {
t.Run(fmt.Sprintf("%s-stream=%v", tc.name, tc.stream), func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
diff --git a/go/arrow/ipc/cmd/arrow-ls/main_test.go b/go/arrow/ipc/cmd/arrow-ls/main_test.go
index 64cd478..0488eae 100644
--- a/go/arrow/ipc/cmd/arrow-ls/main_test.go
+++ b/go/arrow/ipc/cmd/arrow-ls/main_test.go
@@ -102,6 +102,14 @@ records: 3
records: 3
`,
},
+ {
+ name: "fixed_size_binaries",
+ want: `schema:
+ fields: 1
+ - fixed_size_binary_3: type=fixed_size_binary[3], nullable
+records: 3
+`,
+ },
} {
t.Run(tc.name, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -239,6 +247,24 @@ schema:
records: 4
`,
},
+ {
+ stream: true,
+ name: "fixed_size_binaries",
+ want: `schema:
+ fields: 1
+ - fixed_size_binary_3: type=fixed_size_binary[3], nullable
+records: 3
+`,
+ },
+ {
+ name: "fixed_size_binaries",
+ want: `version: V4
+schema:
+ fields: 1
+ - fixed_size_binary_3: type=fixed_size_binary[3], nullable
+records: 3
+`,
+ },
} {
t.Run(fmt.Sprintf("%s-stream=%v", tc.name, tc.stream), func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index b8f08cc..5638f8b 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -450,7 +450,7 @@ func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface {
func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) array.Interface {
field, buffers := ctx.loadCommon(2)
- buffers = append(buffers, nil, ctx.buffer())
+ buffers = append(buffers, ctx.buffer())
data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
defer data.Release()
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index e252023..e5797ba 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -237,12 +237,19 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
case arrow.FixedWidthDataType:
data := arr.Data()
values := data.Buffers()[1]
- typeWidth := dtype.BitWidth() / 8
- minLength := paddedLength(int64(arr.Len())*int64(typeWidth), kArrowAlignment)
+ arrLen := int64(arr.Len())
+ typeWidth := int64(dtype.BitWidth() / 8)
+ minLength := paddedLength(arrLen*typeWidth, kArrowAlignment)
switch {
case needTruncate(int64(data.Offset()), values, minLength):
- panic("not implemented") // FIXME(sbinet) writer.cc:212
+ // non-zero offset: slice the buffer
+ offset := int64(data.Offset()) * typeWidth
+ // send padding if available
+ len := minI64(bitutil.CeilByte64(arrLen*typeWidth), int64(data.Len())-offset)
+ data = array.NewSliceData(data, offset, offset+len)
+ defer data.Release()
+ values = data.Buffers()[1]
default:
if values != nil {
values.Retain()
@@ -268,7 +275,9 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
case needTruncate(int64(data.Offset()), values, totalDataBytes):
panic("not implemented") // FIXME(sbinet) writer.cc:264
default:
- values.Retain()
+ if values != nil {
+ values.Retain()
+ }
}
p.body = append(p.body, voffsets)
p.body = append(p.body, values)
@@ -291,7 +300,9 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
case needTruncate(int64(data.Offset()), values, totalDataBytes):
panic("not implemented") // FIXME(sbinet) writer.cc:264
default:
- values.Retain()
+ if values != nil {
+ values.Retain()
+ }
}
p.body = append(p.body, voffsets)
p.body = append(p.body, values)
@@ -430,3 +441,10 @@ func needTruncate(offset int64, buf *memory.Buffer, minLength int64) bool {
}
return offset != 0 || minLength < int64(buf.Len())
}
+
+func minI64(a, b int64) int64 {
+ if a < b {
+ return a
+ }
+ return b
+}