You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/22 20:02:26 UTC

[arrow] branch master updated: ARROW-3679: [Go] implement read/write IPC for Decimal128

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 695d864  ARROW-3679: [Go] implement read/write IPC for Decimal128
695d864 is described below

commit 695d864fc77d97f9899d18b9cc51d74b11561982
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Sat Jun 22 15:02:19 2019 -0500

    ARROW-3679: [Go] implement read/write IPC for Decimal128
    
    Author: Sebastien Binet <bi...@cern.ch>
    
    Closes #4632 from sbinet/issue-3679 and squashes the following commits:
    
    176176cc9 <Sebastien Binet> ARROW-3679:  implement read/write IPC for Decimal128
---
 go/arrow/internal/arrdata/arrdata.go | 59 ++++++++++++++++++++++++++++++++++++
 go/arrow/ipc/file_reader.go          |  1 +
 go/arrow/ipc/metadata.go             | 16 ++++++++++
 go/arrow/ipc/writer.go               |  7 ++---
 4 files changed, 79 insertions(+), 4 deletions(-)

diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go
index aeb7ee5..4216619 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -23,6 +23,7 @@ import (
 
 	"github.com/apache/arrow/go/arrow"
 	"github.com/apache/arrow/go/arrow/array"
+	"github.com/apache/arrow/go/arrow/decimal128"
 	"github.com/apache/arrow/go/arrow/float16"
 	"github.com/apache/arrow/go/arrow/memory"
 )
@@ -42,6 +43,7 @@ func init() {
 	Records["fixed_size_binaries"] = makeFixedSizeBinariesRecords()
 	Records["intervals"] = makeIntervalsRecords()
 	Records["durations"] = makeDurationsRecords()
+	Records["decimal128"] = makeDecimal128sRecords()
 
 	for k := range Records {
 		RecordNames = append(RecordNames, k)
@@ -575,6 +577,55 @@ func makeDurationsRecords() []array.Record {
 	return recs
 }
 
+var (
+	decimal128Type = &arrow.Decimal128Type{Precision: 10, Scale: 1}
+)
+
+func makeDecimal128sRecords() []array.Record {
+	mem := memory.NewGoAllocator()
+	schema := arrow.NewSchema(
+		[]arrow.Field{
+			arrow.Field{Name: "dec128s", Type: decimal128Type, Nullable: true},
+		}, nil,
+	)
+
+	dec128s := func(vs []int64) []decimal128.Num {
+		o := make([]decimal128.Num, len(vs))
+		for i, v := range vs {
+			o[i] = decimal128.New(v, uint64(v))
+		}
+		return o
+	}
+
+	mask := []bool{true, false, false, true, true}
+	chunks := [][]array.Interface{
+		[]array.Interface{
+			arrayOf(mem, dec128s([]int64{31, 32, 33, 34, 35}), mask),
+		},
+		[]array.Interface{
+			arrayOf(mem, dec128s([]int64{41, 42, 43, 44, 45}), mask),
+		},
+		[]array.Interface{
+			arrayOf(mem, dec128s([]int64{51, 52, 53, 54, 55}), mask),
+		},
+	}
+
+	defer func() {
+		for _, chunk := range chunks {
+			for _, col := range chunk {
+				col.Release()
+			}
+		}
+	}()
+
+	recs := make([]array.Record, len(chunks))
+	for i, chunk := range chunks {
+		recs[i] = array.NewRecord(schema, chunk, -1)
+	}
+
+	return recs
+}
+
 func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface {
 	if mem == nil {
 		mem = memory.NewGoAllocator()
@@ -665,6 +716,14 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface
 		bldr.AppendValues(a, valids)
 		return bldr.NewFloat64Array()
 
+	case []decimal128.Num:
+		bldr := array.NewDecimal128Builder(mem, decimal128Type)
+		defer bldr.Release()
+
+		bldr.AppendValues(a, valids)
+		aa := bldr.NewDecimal128Array()
+		return aa
+
 	case []string:
 		bldr := array.NewStringBuilder(mem)
 		defer bldr.Release()
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 86a372d..9f12384 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -360,6 +360,7 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) array.Interface {
 		*arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type,
 		*arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type,
 		*arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type,
+		*arrow.Decimal128Type,
 		*arrow.Time32Type, *arrow.Time64Type,
 		*arrow.TimestampType,
 		*arrow.Date32Type, *arrow.Date64Type,
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index a0e9364..794a643 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -257,6 +257,13 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
 		fv.dtype = flatbuf.TypeFloatingPoint
 		fv.offset = floatToFB(fv.b, int32(dt.BitWidth()))
 
+	case *arrow.Decimal128Type:
+		fv.dtype = flatbuf.TypeDecimal
+		flatbuf.DecimalStart(fv.b)
+		flatbuf.DecimalAddPrecision(fv.b, dt.Precision)
+		flatbuf.DecimalAddScale(fv.b, dt.Scale)
+		fv.offset = flatbuf.DecimalEnd(fv.b)
+
 	case *arrow.FixedSizeBinaryType:
 		fv.dtype = flatbuf.TypeFixedSizeBinary
 		flatbuf.FixedSizeBinaryStart(fv.b)
@@ -510,6 +517,11 @@ func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arr
 		dt.Init(data.Bytes, data.Pos)
 		return floatFromFB(dt)
 
+	case flatbuf.TypeDecimal:
+		var dt flatbuf.Decimal
+		dt.Init(data.Bytes, data.Pos)
+		return decimalFromFB(dt)
+
 	case flatbuf.TypeBinary:
 		return arrow.BinaryTypes.Binary, nil
 
@@ -651,6 +663,10 @@ func floatToFB(b *flatbuffers.Builder, bw int32) flatbuffers.UOffsetT {
 	}
 }
 
+func decimalFromFB(data flatbuf.Decimal) (arrow.DataType, error) {
+	return &arrow.Decimal128Type{Precision: data.Precision(), Scale: data.Scale()}, nil
+}
+
 func timeFromFB(data flatbuf.Time) (arrow.DataType, error) {
 	bw := data.BitWidth()
 	unit := unitFromFB(data.Unit())
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index e5797ba..e526b65 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -250,10 +250,9 @@ func (w *recordEncoder) visit(p *payload, arr array.Interface) error {
 			data = array.NewSliceData(data, offset, offset+len)
 			defer data.Release()
 			values = data.Buffers()[1]
-		default:
-			if values != nil {
-				values.Retain()
-			}
+		}
+		if values != nil {
+			values.Retain()
 		}
 		p.body = append(p.body, values)