You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by sb...@apache.org on 2019/06/14 16:25:27 UTC
[arrow] branch master updated: ARROW-5591: [Go] implement
read/write IPC for Duration & Intervals
This is an automated email from the ASF dual-hosted git repository.
sbinet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new dee0c1f ARROW-5591: [Go] implement read/write IPC for Duration & Intervals
dee0c1f is described below
commit dee0c1f0d404192d3ba222fc4be7aee88ad3c16b
Author: Sebastien Binet <bi...@cern.ch>
AuthorDate: Fri Jun 14 18:25:07 2019 +0200
ARROW-5591: [Go] implement read/write IPC for Duration & Intervals
Author: Sebastien Binet <bi...@cern.ch>
Closes #4564 from sbinet/issue-5591 and squashes the following commits:
c2e638b28 <Sebastien Binet> go/arrow/ipc: implement read/write IPC for Duration
375844faf <Sebastien Binet> ARROW-5591: implement read/write IPC for Duration & Intervals
---
go/arrow/internal/arrdata/arrdata.go | 155 +++++++++++++++++++++++++++++++++++
go/arrow/ipc/file_reader.go | 4 +-
go/arrow/ipc/metadata.go | 53 ++++++++++++
3 files changed, 211 insertions(+), 1 deletion(-)
diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go
index e76d68a..aeb7ee5 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -40,6 +40,8 @@ func init() {
Records["fixed_size_lists"] = makeFixedSizeListsRecords()
Records["fixed_width_types"] = makeFixedWidthTypesRecords()
Records["fixed_size_binaries"] = makeFixedSizeBinariesRecords()
+ Records["intervals"] = makeIntervalsRecords()
+ Records["durations"] = makeDurationsRecords()
for k := range Records {
RecordNames = append(RecordNames, k)
@@ -474,6 +476,105 @@ func makeFixedSizeBinariesRecords() []array.Record {
return recs
}
+func makeIntervalsRecords() []array.Record {
+ mem := memory.NewGoAllocator()
+
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ arrow.Field{Name: "months", Type: arrow.FixedWidthTypes.MonthInterval, Nullable: true},
+ arrow.Field{Name: "days", Type: arrow.FixedWidthTypes.DayTimeInterval, Nullable: true},
+ }, nil,
+ )
+
+ mask := []bool{true, false, false, true, true}
+ chunks := [][]array.Interface{
+ []array.Interface{
+ arrayOf(mem, []arrow.MonthInterval{1, 2, 3, 4, 5}, mask),
+ arrayOf(mem, []arrow.DayTimeInterval{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}}, mask),
+ },
+ []array.Interface{
+ arrayOf(mem, []arrow.MonthInterval{11, 12, 13, 14, 15}, mask),
+ arrayOf(mem, []arrow.DayTimeInterval{{11, 11}, {12, 12}, {13, 13}, {14, 14}, {15, 15}}, mask),
+ },
+ []array.Interface{
+ arrayOf(mem, []arrow.MonthInterval{21, 22, 23, 24, 25}, mask),
+ arrayOf(mem, []arrow.DayTimeInterval{{21, 21}, {22, 22}, {23, 23}, {24, 24}, {25, 25}}, mask),
+ },
+ }
+
+ defer func() {
+ for _, chunk := range chunks {
+ for _, col := range chunk {
+ col.Release()
+ }
+ }
+ }()
+
+ recs := make([]array.Record, len(chunks))
+ for i, chunk := range chunks {
+ recs[i] = array.NewRecord(schema, chunk, -1)
+ }
+
+ return recs
+}
+
+type (
+ duration_s arrow.Duration
+ duration_ms arrow.Duration
+ duration_us arrow.Duration
+ duration_ns arrow.Duration
+)
+
+func makeDurationsRecords() []array.Record {
+ mem := memory.NewGoAllocator()
+
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ arrow.Field{Name: "durations-s", Type: &arrow.DurationType{Unit: arrow.Second}, Nullable: true},
+ arrow.Field{Name: "durations-ms", Type: &arrow.DurationType{Unit: arrow.Millisecond}, Nullable: true},
+ arrow.Field{Name: "durations-us", Type: &arrow.DurationType{Unit: arrow.Microsecond}, Nullable: true},
+ arrow.Field{Name: "durations-ns", Type: &arrow.DurationType{Unit: arrow.Nanosecond}, Nullable: true},
+ }, nil,
+ )
+
+ mask := []bool{true, false, false, true, true}
+ chunks := [][]array.Interface{
+ []array.Interface{
+ arrayOf(mem, []duration_s{1, 2, 3, 4, 5}, mask),
+ arrayOf(mem, []duration_ms{1, 2, 3, 4, 5}, mask),
+ arrayOf(mem, []duration_us{1, 2, 3, 4, 5}, mask),
+ arrayOf(mem, []duration_ns{1, 2, 3, 4, 5}, mask),
+ },
+ []array.Interface{
+ arrayOf(mem, []duration_s{11, 12, 13, 14, 15}, mask),
+ arrayOf(mem, []duration_ms{11, 12, 13, 14, 15}, mask),
+ arrayOf(mem, []duration_us{11, 12, 13, 14, 15}, mask),
+ arrayOf(mem, []duration_ns{11, 12, 13, 14, 15}, mask),
+ },
+ []array.Interface{
+ arrayOf(mem, []duration_s{21, 22, 23, 24, 25}, mask),
+ arrayOf(mem, []duration_ms{21, 22, 23, 24, 25}, mask),
+ arrayOf(mem, []duration_us{21, 22, 23, 24, 25}, mask),
+ arrayOf(mem, []duration_ns{21, 22, 23, 24, 25}, mask),
+ },
+ }
+
+ defer func() {
+ for _, chunk := range chunks {
+ for _, col := range chunk {
+ col.Release()
+ }
+ }
+ }()
+
+ recs := make([]array.Record, len(chunks))
+ for i, chunk := range chunks {
+ recs[i] = array.NewRecord(schema, chunk, -1)
+ }
+
+ return recs
+}
+
func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface {
if mem == nil {
mem = memory.NewGoAllocator()
@@ -653,6 +754,60 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface
bldr.AppendValues(vs, valids)
return bldr.NewArray()
+ case []arrow.MonthInterval:
+ bldr := array.NewMonthIntervalBuilder(mem)
+ defer bldr.Release()
+
+ bldr.AppendValues(a, valids)
+ return bldr.NewArray()
+
+ case []arrow.DayTimeInterval:
+ bldr := array.NewDayTimeIntervalBuilder(mem)
+ defer bldr.Release()
+
+ bldr.AppendValues(a, valids)
+ return bldr.NewArray()
+
+ case []duration_s:
+ bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Second})
+ defer bldr.Release()
+ vs := make([]arrow.Duration, len(a))
+ for i, v := range a {
+ vs[i] = arrow.Duration(v)
+ }
+ bldr.AppendValues(vs, valids)
+ return bldr.NewArray()
+
+ case []duration_ms:
+ bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Millisecond})
+ defer bldr.Release()
+ vs := make([]arrow.Duration, len(a))
+ for i, v := range a {
+ vs[i] = arrow.Duration(v)
+ }
+ bldr.AppendValues(vs, valids)
+ return bldr.NewArray()
+
+ case []duration_us:
+ bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Microsecond})
+ defer bldr.Release()
+ vs := make([]arrow.Duration, len(a))
+ for i, v := range a {
+ vs[i] = arrow.Duration(v)
+ }
+ bldr.AppendValues(vs, valids)
+ return bldr.NewArray()
+
+ case []duration_ns:
+ bldr := array.NewDurationBuilder(mem, &arrow.DurationType{Unit: arrow.Nanosecond})
+ defer bldr.Release()
+ vs := make([]arrow.Duration, len(a))
+ for i, v := range a {
+ vs[i] = arrow.Duration(v)
+ }
+ bldr.AppendValues(vs, valids)
+ return bldr.NewArray()
+
default:
panic(fmt.Errorf("arrdata: invalid data slice type %T", a))
}
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 5638f8b..86a372d 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -362,7 +362,9 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) array.Interface {
*arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type,
*arrow.Time32Type, *arrow.Time64Type,
*arrow.TimestampType,
- *arrow.Date32Type, *arrow.Date64Type:
+ *arrow.Date32Type, *arrow.Date64Type,
+ *arrow.MonthIntervalType, *arrow.DayTimeIntervalType,
+ *arrow.DurationType:
return ctx.loadPrimitive(dt)
case *arrow.BinaryType, *arrow.StringType:
diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go
index 91db1f2..a0e9364 100644
--- a/go/arrow/ipc/metadata.go
+++ b/go/arrow/ipc/metadata.go
@@ -334,6 +334,25 @@ func (fv *fieldVisitor) visit(dt arrow.DataType) {
flatbuf.FixedSizeListAddListSize(fv.b, dt.Len())
fv.offset = flatbuf.FixedSizeListEnd(fv.b)
+ case *arrow.MonthIntervalType:
+ fv.dtype = flatbuf.TypeInterval
+ flatbuf.IntervalStart(fv.b)
+ flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitYEAR_MONTH)
+ fv.offset = flatbuf.IntervalEnd(fv.b)
+
+ case *arrow.DayTimeIntervalType:
+ fv.dtype = flatbuf.TypeInterval
+ flatbuf.IntervalStart(fv.b)
+ flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitDAY_TIME)
+ fv.offset = flatbuf.IntervalEnd(fv.b)
+
+ case *arrow.DurationType:
+ fv.dtype = flatbuf.TypeDuration
+ unit := unitToFB(dt.Unit)
+ flatbuf.DurationStart(fv.b)
+ flatbuf.DurationAddUnit(fv.b, unit)
+ fv.offset = flatbuf.DurationEnd(fv.b)
+
default:
err := errors.Errorf("arrow/ipc: invalid data type %v", dt)
panic(err) // FIXME(sbinet): implement all data-types.
@@ -537,6 +556,16 @@ func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arr
dt.Init(data.Bytes, data.Pos)
return dateFromFB(dt)
+ case flatbuf.TypeInterval:
+ var dt flatbuf.Interval
+ dt.Init(data.Bytes, data.Pos)
+ return intervalFromFB(dt)
+
+ case flatbuf.TypeDuration:
+ var dt flatbuf.Duration
+ dt.Init(data.Bytes, data.Pos)
+ return durationFromFB(dt)
+
default:
// FIXME(sbinet): implement all the other types.
panic(fmt.Errorf("arrow/ipc: type %v not implemented", flatbuf.EnumNamesType[typ]))
@@ -666,6 +695,30 @@ func dateFromFB(data flatbuf.Date) (arrow.DataType, error) {
return nil, errors.Errorf("arrow/ipc: Date type with %d unit not implemented", data.Unit())
}
+func intervalFromFB(data flatbuf.Interval) (arrow.DataType, error) {
+ switch data.Unit() {
+ case flatbuf.IntervalUnitYEAR_MONTH:
+ return arrow.FixedWidthTypes.MonthInterval, nil
+ case flatbuf.IntervalUnitDAY_TIME:
+ return arrow.FixedWidthTypes.DayTimeInterval, nil
+ }
+ return nil, errors.Errorf("arrow/ipc: Interval type with %d unit not implemented", data.Unit())
+}
+
+func durationFromFB(data flatbuf.Duration) (arrow.DataType, error) {
+ switch data.Unit() {
+ case flatbuf.TimeUnitSECOND:
+ return arrow.FixedWidthTypes.Duration_s, nil
+ case flatbuf.TimeUnitMILLISECOND:
+ return arrow.FixedWidthTypes.Duration_ms, nil
+ case flatbuf.TimeUnitMICROSECOND:
+ return arrow.FixedWidthTypes.Duration_us, nil
+ case flatbuf.TimeUnitNANOSECOND:
+ return arrow.FixedWidthTypes.Duration_ns, nil
+ }
+ return nil, errors.Errorf("arrow/ipc: Duration type with %d unit not implemented", data.Unit())
+}
+
type customMetadataer interface {
CustomMetadataLength() int
CustomMetadata(*flatbuf.KeyValue, int) bool