You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/11/15 23:50:31 UTC

[arrow] 03/03: ARROW-18326: [Go] Add option to support dictionary deltas with IPC (#14639)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 113ccb2b4b458c8c83894b269521b34e16ff68a8
Author: Laurent Quérel <la...@gmail.com>
AuthorDate: Tue Nov 15 13:36:06 2022 -0800

    ARROW-18326: [Go] Add option to support dictionary deltas with IPC (#14639)
    
    Test not working because of a potential issue in dictionary management.
    
    Lead-authored-by: Laurent Querel <l....@f5.com>
    Co-authored-by: Matt Topol <zo...@gmail.com>
    Signed-off-by: Matt Topol <zo...@gmail.com>
---
 go/arrow/ipc/ipc.go      |  8 ++++++
 go/arrow/ipc/ipc_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++---
 go/arrow/ipc/writer.go   | 22 +++++++++--------
 3 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index 71d810b136..44a19b8ae8 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -70,6 +70,7 @@ type config struct {
 	compressNP         int
 	ensureNativeEndian bool
 	noAutoSchema       bool
+	emitDictDeltas     bool
 }
 
 func newConfig(opts ...Option) *config {
@@ -160,6 +161,13 @@ func WithDelayReadSchema(v bool) Option {
 	}
 }
 
+// WithDictionaryDeltas specifies whether or not to emit dictionary deltas.
+func WithDictionaryDeltas(v bool) Option {
+	return func(cfg *config) {
+		cfg.emitDictDeltas = v
+	}
+}
+
 var (
 	_ arrio.Reader = (*Reader)(nil)
 	_ arrio.Writer = (*Writer)(nil)
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index d6d0abf225..1bde7192a5 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -24,12 +24,13 @@ import (
 	"strconv"
 	"testing"
 
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
 	"github.com/apache/arrow/go/v10/arrow"
 	"github.com/apache/arrow/go/v10/arrow/array"
 	"github.com/apache/arrow/go/v10/arrow/ipc"
 	"github.com/apache/arrow/go/v10/arrow/memory"
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
 )
 
 func TestArrow12072(t *testing.T) {
@@ -345,7 +346,7 @@ func TestDictionary(t *testing.T) {
 
 	// IPC writer and reader
 	var bufWriter bytes.Buffer
-	ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
+	ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(false))
 	defer ipcWriter.Close()
 
 	bufReader := bytes.NewReader([]byte{})
@@ -391,6 +392,63 @@ func TestDictionary(t *testing.T) {
 	ipcReader.Release()
 }
 
+// ARROW-18326
+func TestDictionaryDeltas(t *testing.T) {
+	pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+	defer pool.AssertSize(t, 0)
+
+	// A schema with a single dictionary field
+	schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
+		IndexType: arrow.PrimitiveTypes.Uint16,
+		ValueType: arrow.BinaryTypes.String,
+		Ordered:   false,
+	}}}, nil)
+
+	// IPC writer and reader
+	var bufWriter bytes.Buffer
+	ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(true))
+	defer ipcWriter.Close()
+
+	bufReader := bytes.NewReader([]byte{})
+	var ipcReader *ipc.Reader
+
+	bldr := array.NewBuilder(pool, schema.Field(0).Type)
+	defer bldr.Release()
+	require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`)))
+
+	arr := bldr.NewArray()
+	defer arr.Release()
+	// Create a first record with field = "value_0"
+	record := array.NewRecord(schema, []arrow.Array{arr}, 1)
+	defer record.Release()
+
+	expectedJson, err := record.MarshalJSON()
+	require.NoError(t, err)
+	// Serialize and deserialize the record via an IPC stream
+	json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+	require.NoError(t, err)
+	// Compare the expected JSON with the actual JSON
+	require.JSONEq(t, string(expectedJson), string(json))
+
+	// Create a second record with field = "value_1"
+	require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`)))
+	arr = bldr.NewArray()
+	defer arr.Release()
+	record = array.NewRecord(schema, []arrow.Array{arr}, 1)
+	defer record.Release()
+
+	expectedJson, err = record.MarshalJSON()
+	require.NoError(t, err)
+	// Serialize and deserialize the record via an IPC stream
+	json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+	require.NoError(t, err)
+	// Compare the expected JSON with the actual JSON
+	// field = "value_0" but should be "value_1"
+	require.JSONEq(t, string(expectedJson), string(json))
+	require.NoError(t, ipcReader.Err())
+	ipcReader.Release()
+}
+
 // Encode and decode a record over a tuple of IPC writer and reader.
 // IPC writer and reader are the same from one call to another.
 func encodeDecodeIpcStream(t *testing.T,
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 9af88d9c2d..ef20e70815 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -99,11 +99,12 @@ type Writer struct {
 func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
 	cfg := newConfig(opts...)
 	return &Writer{
-		mem:        cfg.alloc,
-		pw:         pw,
-		schema:     cfg.schema,
-		codec:      cfg.codec,
-		compressNP: cfg.compressNP,
+		mem:            cfg.alloc,
+		pw:             pw,
+		schema:         cfg.schema,
+		codec:          cfg.codec,
+		compressNP:     cfg.compressNP,
+		emitDictDeltas: cfg.emitDictDeltas,
 	}
 }
 
@@ -111,11 +112,12 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
 func NewWriter(w io.Writer, opts ...Option) *Writer {
 	cfg := newConfig(opts...)
 	return &Writer{
-		w:      w,
-		mem:    cfg.alloc,
-		pw:     &swriter{w: w},
-		schema: cfg.schema,
-		codec:  cfg.codec,
+		w:              w,
+		mem:            cfg.alloc,
+		pw:             &swriter{w: w},
+		schema:         cfg.schema,
+		codec:          cfg.codec,
+		emitDictDeltas: cfg.emitDictDeltas,
 	}
 }