You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/12/01 17:36:19 UTC

[arrow] branch master updated: GH-14780: [Go] Fix issues with IPC writing of sliced map/list arrays (#14793)

This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 02bbcc5057 GH-14780: [Go] Fix issues with IPC writing of sliced map/list arrays (#14793)
02bbcc5057 is described below

commit 02bbcc5057302696227dc4a7be004c043ad7a68e
Author: Matt Topol <zo...@gmail.com>
AuthorDate: Thu Dec 1 12:36:12 2022 -0500

    GH-14780: [Go] Fix issues with IPC writing of sliced map/list arrays (#14793)
    
    
    * Closes: #14780
    
    Authored-by: Matt Topol <zo...@gmail.com>
    Signed-off-by: Matt Topol <zo...@gmail.com>
---
 go/arrow/array/list.go   |   2 +-
 go/arrow/ipc/ipc_test.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++
 go/arrow/ipc/writer.go   |  19 ++++-----
 3 files changed, 117 insertions(+), 11 deletions(-)

diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go
index 7cf70d7809..a6f1138524 100644
--- a/go/arrow/array/list.go
+++ b/go/arrow/array/list.go
@@ -154,7 +154,7 @@ func (a *List) Release() {
 
 func (a *List) ValueOffsets(i int) (start, end int64) {
 	debug.Assert(i >= 0 && i < a.array.data.length, "index out of range")
-	start, end = int64(a.offsets[i]), int64(a.offsets[i+1])
+	start, end = int64(a.offsets[i+a.data.offset]), int64(a.offsets[i+a.data.offset+1])
 	return
 }
 
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index 2ca4066c89..2208a02a22 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -19,9 +19,11 @@ package ipc_test
 import (
 	"bytes"
 	"errors"
+	"fmt"
 	"io"
 	"math/rand"
 	"strconv"
+	"strings"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
@@ -482,3 +484,108 @@ func encodeDecodeIpcStream(t *testing.T,
 	}
 	return json, ipcReader, nil
 }
+
+func Example_mapSlice() {
+	mem := memory.DefaultAllocator
+	dt := arrow.MapOf(arrow.BinaryTypes.String, arrow.BinaryTypes.String)
+	schema := arrow.NewSchema([]arrow.Field{{
+		Name: "map",
+		Type: dt,
+	}}, nil)
+
+	arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[
+		[{"key": "index1", "value": "main2"}],
+		[{"key": "index3", "value": "main4"}, {"key": "tag_int", "value": ""}],
+		[{"key":"index5","value":"main6"},{"key":"tag_int","value":""}],
+		[{"key":"index6","value":"main7"},{"key":"tag_int","value":""}],
+		[{"key":"index7","value":"main8"},{"key":"tag_int","value":""}],
+		[{"key":"index8","value":"main9"}]
+	]`))
+	if err != nil {
+		panic(err)
+	}
+	defer arr.Release()
+
+	rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len()))
+	defer rec.Release()
+	rec2 := rec.NewSlice(1, 2)
+	defer rec2.Release()
+
+	var buf bytes.Buffer
+	w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema()))
+	if err := w.Write(rec2); err != nil {
+		panic(err)
+	}
+	if err := w.Close(); err != nil {
+		panic(err)
+	}
+
+	r, err := ipc.NewReader(&buf)
+	if err != nil {
+		panic(err)
+	}
+	defer r.Release()
+
+	r.Next()
+	fmt.Println(r.Record())
+
+	// Output:
+	// record:
+	//   schema:
+	//   fields: 1
+	//     - map: type=map<utf8, utf8>
+	//   rows: 1
+	//   col[0][map]: [{["index3" "tag_int"] ["main4" ""]}]
+}
+
+func Example_listSlice() {
+	mem := memory.DefaultAllocator
+	dt := arrow.ListOf(arrow.BinaryTypes.String)
+	schema := arrow.NewSchema([]arrow.Field{{
+		Name: "list",
+		Type: dt,
+	}}, nil)
+
+	arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[
+		["index1"], 
+		["index3", "tag_int"], ["index5", "tag_int"],
+		["index6", "tag_int"], ["index7", "tag_int"], 
+		["index7", "tag_int"],
+		["index8"]
+	]`))
+	if err != nil {
+		panic(err)
+	}
+	defer arr.Release()
+
+	rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len()))
+	defer rec.Release()
+	rec2 := rec.NewSlice(1, 2)
+	defer rec2.Release()
+
+	var buf bytes.Buffer
+	w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema()))
+	if err := w.Write(rec2); err != nil {
+		panic(err)
+	}
+	if err := w.Close(); err != nil {
+		panic(err)
+	}
+
+	r, err := ipc.NewReader(&buf)
+	if err != nil {
+		panic(err)
+	}
+	defer r.Release()
+
+	r.Next()
+	fmt.Println(r.Record())
+
+	// Output:
+	// record:
+	//   schema:
+	//   fields: 1
+	//     - list: type=list<item: utf8, nullable>
+	//   rows: 1
+	//   col[0][list]: [["index3" "tag_int"]]
+}
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 64b9df72ca..088c641952 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -662,7 +662,7 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
 			values        = arr.ListValues()
 			mustRelease   = false
 			values_offset int64
-			values_length int64
+			values_end    int64
 		)
 		defer func() {
 			if mustRelease {
@@ -671,13 +671,13 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
 		}()
 
 		if voffsets != nil {
-			values_offset = int64(arr.Offsets()[0])
-			values_length = int64(arr.Offsets()[arr.Len()]) - values_offset
+			values_offset, _ = arr.ValueOffsets(0)
+			_, values_end = arr.ValueOffsets(arr.Len() - 1)
 		}
 
-		if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) {
+		if arr.Len() != 0 || values_end < int64(values.Len()) {
 			// must also slice the values
-			values = array.NewSlice(values, values_offset, values_length)
+			values = array.NewSlice(values, values_offset, values_end)
 			mustRelease = true
 		}
 		err = w.visit(p, values)
@@ -699,7 +699,7 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
 			values        = arr.ListValues()
 			mustRelease   = false
 			values_offset int64
-			values_length int64
+			values_end    int64
 		)
 		defer func() {
 			if mustRelease {
@@ -709,13 +709,12 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error {
 
 		if arr.Len() > 0 && voffsets != nil {
 			values_offset, _ = arr.ValueOffsets(0)
-			_, values_length = arr.ValueOffsets(arr.Len() - 1)
-			values_length -= values_offset
+			_, values_end = arr.ValueOffsets(arr.Len() - 1)
 		}
 
-		if arr.Len() != 0 || values_length < int64(values.Len()) {
+		if arr.Len() != 0 || values_end < int64(values.Len()) {
 			// must also slice the values
-			values = array.NewSlice(values, values_offset, values_length)
+			values = array.NewSlice(values, values_offset, values_end)
 			mustRelease = true
 		}
 		err = w.visit(p, values)