You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by sb...@apache.org on 2019/06/14 16:25:27 UTC

[arrow] branch master updated: ARROW-5591: [Go] implement read/write IPC for Duration & Intervals

This is an automated email from the ASF dual-hosted git repository.

sbinet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new dee0c1f  ARROW-5591: [Go] implement read/write IPC for Duration & Intervals
dee0c1f is described below

commit dee0c1f0d404192d3ba222fc4be7aee88ad3c16b
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Fri Jun 14 18:25:07 2019 +0200

    ARROW-5591: [Go] implement read/write IPC for Duration & Intervals
    
    Author: Sebastien Binet <bi...@cern.ch>
    
    Closes #4564 from sbinet/issue-5591 and squashes the following commits:
    
    c2e638b28 <Sebastien Binet> go/arrow/ipc: implement read/write IPC for Duration
    375844faf <Sebastien Binet> ARROW-5591:  implement read/write IPC for Duration & Intervals
---
 go/arrow/internal/arrdata/arrdata.go | 155 +++++++++++++++++++++++++++++++++++
 go/arrow/ipc/file_reader.go          |   4 +-
 go/arrow/ipc/metadata.go             |  53 ++++++++++++
 3 files changed, 211 insertions(+), 1 deletion(-)

diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go
index e76d68a..aeb7ee5 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -40,6 +40,8 @@ func init() {
 	Records["fixed_size_lists"] = makeFixedSizeListsRecords()
 	Records["fixed_width_types"] = makeFixedWidthTypesRecords()
 	Records["fixed_size_binaries"] = makeFixedSizeBinariesRecords()
+	Records["intervals"] = makeIntervalsRecords()
+	Records["durations"] = makeDurationsRecords()
 
 	for k := range Records {
 		RecordNames = append(RecordNames, k)
@@ -474,6 +476,105 @@ func makeFixedSizeBinariesRecords() []array.Record {
 	return recs
 }
 
+func makeIntervalsRecords() []array.Record {
+	mem := memory.NewGoAllocator()
+
+	schema := arrow.NewSchema(
+		[]arrow.Field{
+			arrow.Field{Name: "months", Type: arrow.FixedWidthTypes.MonthInterval, Nullable: true},
+			arrow.Field{Name: "days", Type: arrow.FixedWidthTypes.DayTimeInterval, Nullable: true},
+		}, nil,
+	)
+
+	mask := []bool{true, false, false, true, true}
+	chunks := [][]array.Interface{
+		[]array.Interface{
+			arrayOf(mem, []arrow.MonthInterval{1, 2, 3, 4, 5}, mask),
+			arrayOf(mem, []arrow.DayTimeInterval{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}}, mask),
+		},
+		[]array.Interface{
+			arrayOf(mem, []arrow.MonthInterval{11, 12, 13, 14, 15}, mask),
+			arrayOf(mem, []arrow.DayTimeInterval{{11, 11}, {12, 12}, {13, 13}, {14, 14}, {15, 15}}, mask),
+		},
+		[]array.Interface{
+			arrayOf(mem, []arrow.MonthInterval{21, 22, 23, 24, 25}, mask),
+			arrayOf(mem, []arrow.DayTimeInterval{{21, 21}, {22, 22}, {23, 23}, {24, 24}, {25, 25}}, mask),
+		},
+	}
+
+	defer func() {
+		for _, chunk := range chunks {
+			for _, col := range chunk {
+				col.Release()
+			}
+		}
+	}()
+
+	recs := make([]array.Record, len(chunks))
+	for i, chunk := range chunks {
+		recs[i] = array.NewRecord(schema, chunk, -1)
+	}
+
+	return recs
+}
+
+type (
+	duration_s  arrow.Duration
+	duration_ms arrow.Duration
+	duration_us arrow.Duration
+	duration_ns arrow.Duration
+)
+
+func makeDurationsRecords() []array.Record {
+	mem := memory.NewGoAllocator()
+
+	schema := arrow.NewSchema(
+		[]arrow.Field{
+			arrow.Field{Name: "durations-s", Type: &arrow.DurationType{Unit: arrow.Second}, Nullable: true},
+			arrow.Field{Name: "durations-ms", Type: &arrow.DurationType{Unit: arrow.Millisecond}, Nullable: true},
+			arrow.Field{Name: "durations-us", Type: &arrow.DurationType{Unit: arrow.Microsecond}, Nullable: true},
+			arrow.Field{Name: "durations-ns", Type: &arrow.DurationType{Unit: arrow.Nanosecond}, Nullable: true},
+		}, nil,
+	)
+
+	mask := []bool{true, false, false, true, true}
+	chunks := [][]array.Interface{
+		[]array.Interface{
+			arrayOf(mem, []duration_s{1, 2, 3, 4, 5}, mask),
+			arrayOf(mem, []duration_ms{1, 2, 3, 4, 5}, mask),
+			arrayOf(mem, []duration_us{1, 2, 3, 4, 5}, mask),
+			arrayOf(mem, []duration_ns{1, 2, 3, 4, 5}, mask),
+		},
+		[]array.Interface{
+			arrayOf(mem, []duration_s{11, 12, 13, 14, 15}, mask),
+			arrayOf(mem, []duration_ms{11, 12, 13, 14, 15}, mask),
+			arrayOf(mem, []duration_us{11, 12, 13, 14, 15}, mask),
+			arrayOf(mem, []duration_ns{11, 12, 13, 14, 15}, mask),
+		},
+		[]array.Interface{
+			arrayOf(mem, []duration_s{21, 22, 23, 24, 25}, mask),
+			arrayOf(mem, []duration_ms{21, 22, 23, 24, 25}, mask),
+			arrayOf(mem, []duration_us{21, 22, 23, 24, 25}, mask),
+			arrayOf(mem, []duration_ns{21, 22, 23, 24, 25}, mask),
+		},
+	}
+
+	defer func() {
+		for _, chunk := range chunks {
+			for _, col := range chunk {
+				col.Release()
+			}
+		}
+	}()
+
+	recs := make([]array.Record, len(chunks))
+	for i, chunk := range chunks {
+		recs[i] = array.NewRecord(schema, chunk, -1)
+	}
+
+	return recs
+}
+
 func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface {
 	if mem == nil {
 		mem = memory.NewGoAllocator()
@@ -653,6 +754,60 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface
 		bldr.AppendValues(vs, valids)
 		return bldr.NewArray()
 
+	case []arrow.MonthInterval:
+		bldr := array.NewMonthIntervalBuilder(mem)
+		defer bldr.Release()
+
+		bldr.AppendValues(a, valids)
+		return bldr.NewArray()
+
+	case []arrow.DayTimeInterval:
+		bldr := array.NewDayTimeIntervalBuilder(mem)
+		defer bldr.Release()
+
+		bldr.AppendValues(a, valids)
+		return bldr.NewArray()
+
+	case []duration_s:
+		bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Second})
+		defer bldr.Release()
+		vs := make([]arrow.Duration, len(a))
+		for i, v := range a {
+			vs[i] = arrow.Duration(v)
+		}
+		bldr.AppendValues(vs, valids)
+		return bldr.NewArray()
+
+	case []duration_ms:
+		bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Millisecond})
+		defer bldr.Release()
+		vs := make([]arrow.Duration, len(a))
+		for i, v := range a {
+			vs[i] = arrow.Duration(v)
+		}
+		bldr.AppendValues(vs, valids)
+		return bldr.NewArray()
+
+	case []duration_us:
+		bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Microsecond})
+		defer bldr.Release()
+		vs := make([]arrow.Duration, len(a))
+		for i, v := range a {
+			vs[i] = arrow.Duration(v)
+		}
+		bldr.AppendValues(vs, valids)
+		return bldr.NewArray()
+
+	case []duration_ns:
+		bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Nanosecond})
+		defer bldr.Release()
+		vs := make([]arrow.Duration, len(a))
+		for i, v := range a {
+			vs[i] = arrow.Duration(v)
+		}
+		bldr.AppendValues(vs, valids)
+		return bldr.NewArray()
+
 	default:
 		panic(fmt.Errorf("arrdata: invalid data slice type %T", a))
 	}
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 5638f8b..86a372d 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -362,7 +362,9 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) array.Interface {
 		*arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type,
 		*arrow.Time32Type, *arrow.Time64Type,
 		*arrow.TimestampType,
-		*arrow.Date32Type, *arrow.Date64Type:
+		*arrow.Date32Type, *arrow.Date64Type,
+		*arrow.MonthIntervalType, *arrow.DayTimeIntervalType,
+		*arrow.DurationType:
 		return ctx.loadPrimitive(dt)
 
 	case *arrow.BinaryType, *arrow.StringType:
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index 91db1f2..a0e9364 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -334,6 +334,25 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
 		flatbuf.FixedSizeListAddListSize(fv.b, dt.Len())
 		fv.offset = flatbuf.FixedSizeListEnd(fv.b)
 
+	case *arrow.MonthIntervalType:
+		fv.dtype = flatbuf.TypeInterval
+		flatbuf.IntervalStart(fv.b)
+		flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitYEAR_MONTH)
+		fv.offset = flatbuf.IntervalEnd(fv.b)
+
+	case *arrow.DayTimeIntervalType:
+		fv.dtype = flatbuf.TypeInterval
+		flatbuf.IntervalStart(fv.b)
+		flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitDAY_TIME)
+		fv.offset = flatbuf.IntervalEnd(fv.b)
+
+	case *arrow.DurationType:
+		fv.dtype = flatbuf.TypeDuration
+		unit := unitToFB(dt.Unit)
+		flatbuf.DurationStart(fv.b)
+		flatbuf.DurationAddUnit(fv.b, unit)
+		fv.offset = flatbuf.DurationEnd(fv.b)
+
 	default:
 		err := errors.Errorf("arrow/ipc: invalid data type %v", dt)
 		panic(err) // FIXME(sbinet): implement all data-types.
@@ -537,6 +556,16 @@ func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arr
 		dt.Init(data.Bytes, data.Pos)
 		return dateFromFB(dt)
 
+	case flatbuf.TypeInterval:
+		var dt flatbuf.Interval
+		dt.Init(data.Bytes, data.Pos)
+		return intervalFromFB(dt)
+
+	case flatbuf.TypeDuration:
+		var dt flatbuf.Duration
+		dt.Init(data.Bytes, data.Pos)
+		return durationFromFB(dt)
+
 	default:
 		// FIXME(sbinet): implement all the other types.
 		panic(fmt.Errorf("arrow/ipc: type %v not implemented", flatbuf.EnumNamesType[typ]))
@@ -666,6 +695,30 @@ func dateFromFB(data flatbuf.Date) (arrow.DataType, error) {
 	return nil, errors.Errorf("arrow/ipc: Date type with %d unit not implemented", data.Unit())
 }
 
+func intervalFromFB(data flatbuf.Interval) (arrow.DataType, error) {
+	switch data.Unit() {
+	case flatbuf.IntervalUnitYEAR_MONTH:
+		return arrow.FixedWidthTypes.MonthInterval, nil
+	case flatbuf.IntervalUnitDAY_TIME:
+		return arrow.FixedWidthTypes.DayTimeInterval, nil
+	}
+	return nil, errors.Errorf("arrow/ipc: Interval type with %d unit not implemented", data.Unit())
+}
+
+func durationFromFB(data flatbuf.Duration) (arrow.DataType, error) {
+	switch data.Unit() {
+	case flatbuf.TimeUnitSECOND:
+		return arrow.FixedWidthTypes.Duration_s, nil
+	case flatbuf.TimeUnitMILLISECOND:
+		return arrow.FixedWidthTypes.Duration_ms, nil
+	case flatbuf.TimeUnitMICROSECOND:
+		return arrow.FixedWidthTypes.Duration_us, nil
+	case flatbuf.TimeUnitNANOSECOND:
+		return arrow.FixedWidthTypes.Duration_ns, nil
+	}
+	return nil, errors.Errorf("arrow/ipc: Duration type with %d unit not implemented", data.Unit())
+}
+
 type customMetadataer interface {
 	CustomMetadataLength() int
 	CustomMetadata(*flatbuf.KeyValue, int) bool