You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/22 20:02:26 UTC
[arrow] branch master updated: ARROW-3679: [Go] implement
read/write IPC for Decimal128
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 695d864 ARROW-3679: [Go] implement read/write IPC for Decimal128
695d864 is described below
commit 695d864fc77d97f9899d18b9cc51d74b11561982
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Sat Jun 22 15:02:19 2019 -0500
ARROW-3679: [Go] implement read/write IPC for Decimal128
Author: Sebastien Binet <bi...@cern.ch>
Closes #4632 from sbinet/issue-3679 and squashes the following commits:
176176cc9 <Sebastien Binet> ARROW-3679: implement read/write IPC for Decimal128
---
go/arrow/internal/arrdata/arrdata.go | 59 ++++++++++++++++++++++++++++++++++++
go/arrow/ipc/file_reader.go | 1 +
go/arrow/ipc/metadata.go | 16 ++++++++++
go/arrow/ipc/writer.go | 7 ++---
4 files changed, 79 insertions(+), 4 deletions(-)
diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go
index aeb7ee5..4216619 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/decimal128"
"github.com/apache/arrow/go/arrow/float16"
"github.com/apache/arrow/go/arrow/memory"
)
@@ -42,6 +43,7 @@ func init() {
Records["fixed_size_binaries"] = makeFixedSizeBinariesRecords()
Records["intervals"] = makeIntervalsRecords()
Records["durations"] = makeDurationsRecords()
+ Records["decimal128"] = makeDecimal128sRecords()
for k := range Records {
RecordNames = append(RecordNames, k)
@@ -575,6 +577,55 @@ func makeDurationsRecords() []array.Record {
return recs
}
+var (
+ decimal128Type = &arrow.Decimal128Type{Precision: 10, Scale: 1}
+)
+
+func makeDecimal128sRecords() []array.Record {
+ mem := memory.NewGoAllocator()
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ arrow.Field{Name: "dec128s", Type: decimal128Type, Nullable: true},
+ }, nil,
+ )
+
+ dec128s := func(vs []int64) []decimal128.Num {
+ o := make([]decimal128.Num, len(vs))
+ for i, v := range vs {
+ o[i] = decimal128.New(v, uint64(v))
+ }
+ return o
+ }
+
+ mask := []bool{true, false, false, true, true}
+ chunks := [][]array.Interface{
+ []array.Interface{
+ arrayOf(mem, dec128s([]int64{31, 32, 33, 34, 35}), mask),
+ },
+ []array.Interface{
+ arrayOf(mem, dec128s([]int64{41, 42, 43, 44, 45}), mask),
+ },
+ []array.Interface{
+ arrayOf(mem, dec128s([]int64{51, 52, 53, 54, 55}), mask),
+ },
+ }
+
+ defer func() {
+ for _, chunk := range chunks {
+ for _, col := range chunk {
+ col.Release()
+ }
+ }
+ }()
+
+ recs := make([]array.Record, len(chunks))
+ for i, chunk := range chunks {
+ recs[i] = array.NewRecord(schema, chunk, -1)
+ }
+
+ return recs
+}
+
func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface {
if mem == nil {
mem = memory.NewGoAllocator()
@@ -665,6 +716,14 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface
bldr.AppendValues(a, valids)
return bldr.NewFloat64Array()
+ case []decimal128.Num:
+ bldr := array.NewDecimal128Builder(mem, decimal128Type)
+ defer bldr.Release()
+
+ bldr.AppendValues(a, valids)
+ aa := bldr.NewDecimal128Array()
+ return aa
+
case []string:
bldr := array.NewStringBuilder(mem)
defer bldr.Release()
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 86a372d..9f12384 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -360,6 +360,7 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) array.Interface {
*arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type,
*arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type,
*arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type,
+ *arrow.Decimal128Type,
*arrow.Time32Type, *arrow.Time64Type,
*arrow.TimestampType,
*arrow.Date32Type, *arrow.Date64Type,
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index a0e9364..794a643 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -257,6 +257,13 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
fv.dtype = flatbuf.TypeFloatingPoint
fv.offset = floatToFB(fv.b, int32(dt.BitWidth()))
+ case *arrow.Decimal128Type:
+ fv.dtype = flatbuf.TypeDecimal
+ flatbuf.DecimalStart(fv.b)
+ flatbuf.DecimalAddPrecision(fv.b, dt.Precision)
+ flatbuf.DecimalAddScale(fv.b, dt.Scale)
+ fv.offset = flatbuf.DecimalEnd(fv.b)
+
case *arrow.FixedSizeBinaryType:
fv.dtype = flatbuf.TypeFixedSizeBinary
flatbuf.FixedSizeBinaryStart(fv.b)
@@ -510,6 +517,11 @@ func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arr
dt.Init(data.Bytes, data.Pos)
return floatFromFB(dt)
+ case flatbuf.TypeDecimal:
+ var dt flatbuf.Decimal
+ dt.Init(data.Bytes, data.Pos)
+ return decimalFromFB(dt)
+
case flatbuf.TypeBinary:
return arrow.BinaryTypes.Binary, nil
@@ -651,6 +663,10 @@ func floatToFB(b *flatbuffers.Builder, bw int32) flatbuffers.UOffsetT {
}
}
+func decimalFromFB(data flatbuf.Decimal) (arrow.DataType, error) {
+ return &arrow.Decimal128Type{Precision: data.Precision(), Scale: data.Scale()}, nil
+}
+
func timeFromFB(data flatbuf.Time) (arrow.DataType, error) {
bw := data.BitWidth()
unit := unitFromFB(data.Unit())
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index e5797ba..e526b65 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -250,10 +250,9 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
data = array.NewSliceData(data, offset, offset+len)
defer data.Release()
values = data.Buffers()[1]
- default:
- if values != nil {
- values.Retain()
- }
+ }
+ if values != nil {
+ values.Retain()
}
p.body = append(p.body, values)