You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/11/15 23:50:31 UTC
[arrow] 03/03: ARROW-18326: [Go] Add option to support dictionary deltas with IPC (#14639)
This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 113ccb2b4b458c8c83894b269521b34e16ff68a8
Author: Laurent Quérel <la...@gmail.com>
AuthorDate: Tue Nov 15 13:36:06 2022 -0800
ARROW-18326: [Go] Add option to support dictionary deltas with IPC (#14639)
Test not working because of a potential issue in dictionary management.
Lead-authored-by: Laurent Querel <l....@f5.com>
Co-authored-by: Matt Topol <zo...@gmail.com>
Signed-off-by: Matt Topol <zo...@gmail.com>
---
go/arrow/ipc/ipc.go | 8 ++++++
go/arrow/ipc/ipc_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++---
go/arrow/ipc/writer.go | 22 +++++++++--------
3 files changed, 81 insertions(+), 13 deletions(-)
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index 71d810b136..44a19b8ae8 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -70,6 +70,7 @@ type config struct {
compressNP int
ensureNativeEndian bool
noAutoSchema bool
+ emitDictDeltas bool
}
func newConfig(opts ...Option) *config {
@@ -160,6 +161,13 @@ func WithDelayReadSchema(v bool) Option {
}
}
+// WithDictionaryDeltas specifies whether or not to emit dictionary deltas.
+func WithDictionaryDeltas(v bool) Option {
+ return func(cfg *config) {
+ cfg.emitDictDeltas = v
+ }
+}
+
var (
_ arrio.Reader = (*Reader)(nil)
_ arrio.Writer = (*Writer)(nil)
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index d6d0abf225..1bde7192a5 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -24,12 +24,13 @@ import (
"strconv"
"testing"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/array"
"github.com/apache/arrow/go/v10/arrow/ipc"
"github.com/apache/arrow/go/v10/arrow/memory"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
)
func TestArrow12072(t *testing.T) {
@@ -345,7 +346,7 @@ func TestDictionary(t *testing.T) {
// IPC writer and reader
var bufWriter bytes.Buffer
- ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
+ ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(false))
defer ipcWriter.Close()
bufReader := bytes.NewReader([]byte{})
@@ -391,6 +392,63 @@ func TestDictionary(t *testing.T) {
ipcReader.Release()
}
+// ARROW-18326
+func TestDictionaryDeltas(t *testing.T) {
+ pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer pool.AssertSize(t, 0)
+
+ // A schema with a single dictionary field
+ schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
+ IndexType: arrow.PrimitiveTypes.Uint16,
+ ValueType: arrow.BinaryTypes.String,
+ Ordered: false,
+ }}}, nil)
+
+ // IPC writer and reader
+ var bufWriter bytes.Buffer
+ ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(true))
+ defer ipcWriter.Close()
+
+ bufReader := bytes.NewReader([]byte{})
+ var ipcReader *ipc.Reader
+
+ bldr := array.NewBuilder(pool, schema.Field(0).Type)
+ defer bldr.Release()
+ require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`)))
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+ // Create a first record with field = "value_0"
+ record := array.NewRecord(schema, []arrow.Array{arr}, 1)
+ defer record.Release()
+
+ expectedJson, err := record.MarshalJSON()
+ require.NoError(t, err)
+ // Serialize and deserialize the record via an IPC stream
+ json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+ require.NoError(t, err)
+ // Compare the expected JSON with the actual JSON
+ require.JSONEq(t, string(expectedJson), string(json))
+
+ // Create a second record with field = "value_1"
+ require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`)))
+ arr = bldr.NewArray()
+ defer arr.Release()
+ record = array.NewRecord(schema, []arrow.Array{arr}, 1)
+ defer record.Release()
+
+ expectedJson, err = record.MarshalJSON()
+ require.NoError(t, err)
+ // Serialize and deserialize the record via an IPC stream
+ json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+ require.NoError(t, err)
+ // Compare the expected JSON with the actual JSON
+ // field = "value_0" but should be "value_1"
+ require.JSONEq(t, string(expectedJson), string(json))
+ require.NoError(t, ipcReader.Err())
+ ipcReader.Release()
+}
+
// Encode and decode a record over a tuple of IPC writer and reader.
// IPC writer and reader are the same from one call to another.
func encodeDecodeIpcStream(t *testing.T,
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 9af88d9c2d..ef20e70815 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -99,11 +99,12 @@ type Writer struct {
func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
cfg := newConfig(opts...)
return &Writer{
- mem: cfg.alloc,
- pw: pw,
- schema: cfg.schema,
- codec: cfg.codec,
- compressNP: cfg.compressNP,
+ mem: cfg.alloc,
+ pw: pw,
+ schema: cfg.schema,
+ codec: cfg.codec,
+ compressNP: cfg.compressNP,
+ emitDictDeltas: cfg.emitDictDeltas,
}
}
@@ -111,11 +112,12 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
func NewWriter(w io.Writer, opts ...Option) *Writer {
cfg := newConfig(opts...)
return &Writer{
- w: w,
- mem: cfg.alloc,
- pw: &swriter{w: w},
- schema: cfg.schema,
- codec: cfg.codec,
+ w: w,
+ mem: cfg.alloc,
+ pw: &swriter{w: w},
+ schema: cfg.schema,
+ codec: cfg.codec,
+ emitDictDeltas: cfg.emitDictDeltas,
}
}