You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/12/01 17:36:19 UTC
[arrow] branch master updated: GH-14780: [Go] Fix issues with IPC writing of sliced map/list arrays (#14793)
This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 02bbcc5057 GH-14780: [Go] Fix issues with IPC writing of sliced map/list arrays (#14793)
02bbcc5057 is described below
commit 02bbcc5057302696227dc4a7be004c043ad7a68e
Author: Matt Topol <zo...@gmail.com>
AuthorDate: Thu Dec 1 12:36:12 2022 -0500
GH-14780: [Go] Fix issues with IPC writing of sliced map/list arrays (#14793)
* Closes: #14780
Authored-by: Matt Topol <zo...@gmail.com>
Signed-off-by: Matt Topol <zo...@gmail.com>
---
go/arrow/array/list.go | 2 +-
go/arrow/ipc/ipc_test.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++
go/arrow/ipc/writer.go | 19 ++++-----
3 files changed, 117 insertions(+), 11 deletions(-)
diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go
index 7cf70d7809..a6f1138524 100644
--- a/go/arrow/array/list.go
+++ b/go/arrow/array/list.go
@@ -154,7 +154,7 @@ func (a *List) Release() {
func (a *List) ValueOffsets(i int) (start, end int64) {
debug.Assert(i >= 0 && i < a.array.data.length, "index out of range")
- start, end = int64(a.offsets[i]), int64(a.offsets[i+1])
+ start, end = int64(a.offsets[i+a.data.offset]), int64(a.offsets[i+a.data.offset+1])
return
}
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index 2ca4066c89..2208a02a22 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -19,9 +19,11 @@ package ipc_test
import (
"bytes"
"errors"
+ "fmt"
"io"
"math/rand"
"strconv"
+ "strings"
"testing"
"github.com/stretchr/testify/assert"
@@ -482,3 +484,108 @@ func encodeDecodeIpcStream(t *testing.T,
}
return json, ipcReader, nil
}
+
+func Example_mapSlice() {
+ mem := memory.DefaultAllocator
+ dt := arrow.MapOf(arrow.BinaryTypes.String, arrow.BinaryTypes.String)
+ schema := arrow.NewSchema([]arrow.Field{{
+ Name: "map",
+ Type: dt,
+ }}, nil)
+
+ arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[
+ [{"key": "index1", "value": "main2"}],
+ [{"key": "index3", "value": "main4"}, {"key": "tag_int", "value": ""}],
+ [{"key":"index5","value":"main6"},{"key":"tag_int","value":""}],
+ [{"key":"index6","value":"main7"},{"key":"tag_int","value":""}],
+ [{"key":"index7","value":"main8"},{"key":"tag_int","value":""}],
+ [{"key":"index8","value":"main9"}]
+ ]`))
+ if err != nil {
+ panic(err)
+ }
+ defer arr.Release()
+
+ rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len()))
+ defer rec.Release()
+ rec2 := rec.NewSlice(1, 2)
+ defer rec2.Release()
+
+ var buf bytes.Buffer
+ w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema()))
+ if err := w.Write(rec2); err != nil {
+ panic(err)
+ }
+ if err := w.Close(); err != nil {
+ panic(err)
+ }
+
+ r, err := ipc.NewReader(&buf)
+ if err != nil {
+ panic(err)
+ }
+ defer r.Release()
+
+ r.Next()
+ fmt.Println(r.Record())
+
+ // Output:
+ // record:
+ // schema:
+ // fields: 1
+ // - map: type=map<utf8, utf8>
+ // rows: 1
+ // col[0][map]: [{["index3" "tag_int"] ["main4" ""]}]
+}
+
+func Example_listSlice() {
+ mem := memory.DefaultAllocator
+ dt := arrow.ListOf(arrow.BinaryTypes.String)
+ schema := arrow.NewSchema([]arrow.Field{{
+ Name: "list",
+ Type: dt,
+ }}, nil)
+
+ arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[
+ ["index1"],
+ ["index3", "tag_int"], ["index5", "tag_int"],
+ ["index6", "tag_int"], ["index7", "tag_int"],
+ ["index7", "tag_int"],
+ ["index8"]
+ ]`))
+ if err != nil {
+ panic(err)
+ }
+ defer arr.Release()
+
+ rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len()))
+ defer rec.Release()
+ rec2 := rec.NewSlice(1, 2)
+ defer rec2.Release()
+
+ var buf bytes.Buffer
+ w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema()))
+ if err := w.Write(rec2); err != nil {
+ panic(err)
+ }
+ if err := w.Close(); err != nil {
+ panic(err)
+ }
+
+ r, err := ipc.NewReader(&buf)
+ if err != nil {
+ panic(err)
+ }
+ defer r.Release()
+
+ r.Next()
+ fmt.Println(r.Record())
+
+ // Output:
+ // record:
+ // schema:
+ // fields: 1
+ // - list: type=list<item: utf8, nullable>
+ // rows: 1
+ // col[0][list]: [["index3" "tag_int"]]
+}
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 64b9df72ca..088c641952 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -662,7 +662,7 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
values = arr.ListValues()
mustRelease = false
values_offset int64
- values_length int64
+ values_end int64
)
defer func() {
if mustRelease {
@@ -671,13 +671,13 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
}()
if voffsets != nil {
- values_offset = int64(arr.Offsets()[0])
- values_length = int64(arr.Offsets()[arr.Len()]) - values_offset
+ values_offset, _ = arr.ValueOffsets(0)
+ _, values_end = arr.ValueOffsets(arr.Len() - 1)
}
- if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) {
+ if arr.Len() != 0 || values_end < int64(values.Len()) {
// must also slice the values
- values = array.NewSlice(values, values_offset, values_length)
+ values = array.NewSlice(values, values_offset, values_end)
mustRelease = true
}
err = w.visit(p, values)
@@ -699,7 +699,7 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
values = arr.ListValues()
mustRelease = false
values_offset int64
- values_length int64
+ values_end int64
)
defer func() {
if mustRelease {
@@ -709,13 +709,12 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
if arr.Len() > 0 && voffsets != nil {
values_offset, _ = arr.ValueOffsets(0)
- _, values_length = arr.ValueOffsets(arr.Len() - 1)
- values_length -= values_offset
+ _, values_end = arr.ValueOffsets(arr.Len() - 1)
}
- if arr.Len() != 0 || values_length < int64(values.Len()) {
+ if arr.Len() != 0 || values_end < int64(values.Len()) {
// must also slice the values
- values = array.NewSlice(values, values_offset, values_length)
+ values = array.NewSlice(values, values_offset, values_end)
mustRelease = true
}
err = w.visit(p, values)