You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/11/15 23:50:29 UTC

[arrow] 01/03: ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 60e1d81ed9c7ff409d96f1c658698df19ce79748
Author: Matt Topol <zo...@gmail.com>
AuthorDate: Tue Nov 15 10:55:56 2022 -0500

    ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636)
    
    Fix dictionary replacement for IPC streams. Currently they incorrectly get concatenated together instead of replaced when not using deltas. This will properly replace dictionaries when encountering a non-delta dictionary message.
    
    Authored-by: Matt Topol <zo...@gmail.com>
    Signed-off-by: Matt Topol <zo...@gmail.com>
---
 go/arrow/internal/dictutils/dict.go | 10 +++-
 go/arrow/ipc/ipc_test.go            | 94 +++++++++++++++++++++++++++++++++++++
 2 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/go/arrow/internal/dictutils/dict.go b/go/arrow/internal/dictutils/dict.go
index 9dd17c6540..561923f626 100644
--- a/go/arrow/internal/dictutils/dict.go
+++ b/go/arrow/internal/dictutils/dict.go
@@ -344,10 +344,18 @@ func (memo *Memo) AddDelta(id int64, v arrow.ArrayData) {
 	memo.id2dict[id] = append(d, v)
 }
 
+// AddOrReplace puts the provided dictionary into the memo table. If it
+// already exists, then the new data will replace it. Otherwise it is added
+// to the memo table.
 func (memo *Memo) AddOrReplace(id int64, v arrow.ArrayData) bool {
 	d, ok := memo.id2dict[id]
 	if ok {
-		d = append(d, v)
+		// replace the dictionary and release any existing ones
+		for _, dict := range d {
+			dict.Release()
+		}
+		d[0] = v
+		d = d[:1]
 	} else {
 		d = []arrow.ArrayData{v}
 	}
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index 2f0816c0d0..d6d0abf225 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -330,3 +330,97 @@ func TestIPCTable(t *testing.T) {
 		n++
 	}
 }
+
+// ARROW-18317
+func TestDictionary(t *testing.T) {
+	pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+	defer pool.AssertSize(t, 0)
+
+	// A schema with a single dictionary field
+	schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
+		IndexType: arrow.PrimitiveTypes.Uint16,
+		ValueType: arrow.BinaryTypes.String,
+		Ordered:   false,
+	}}}, nil)
+
+	// IPC writer and reader
+	var bufWriter bytes.Buffer
+	ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
+	defer ipcWriter.Close()
+
+	bufReader := bytes.NewReader([]byte{})
+	var ipcReader *ipc.Reader
+
+	bldr := array.NewBuilder(pool, schema.Field(0).Type)
+	defer bldr.Release()
+	require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`)))
+
+	arr := bldr.NewArray()
+	defer arr.Release()
+	// Create a first record with field = "value_0"
+	record := array.NewRecord(schema, []arrow.Array{arr}, 1)
+	defer record.Release()
+
+	expectedJson, err := record.MarshalJSON()
+	require.NoError(t, err)
+	// Serialize and deserialize the record via an IPC stream
+	json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+	require.NoError(t, err)
+	// Compare the expected JSON with the actual JSON
+	require.JSONEq(t, string(expectedJson), string(json))
+
+	// Create a second record with field = "value_1"
+	require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`)))
+	arr = bldr.NewArray()
+	defer arr.Release()
+	record = array.NewRecord(schema, []arrow.Array{arr}, 1)
+
+	// record, _, err = array.RecordFromJSON(pool, schema, strings.NewReader(`[{"field": ["value_1"]}]`))
+	// require.NoError(t, err)
+	defer record.Release()
+
+	expectedJson, err = record.MarshalJSON()
+	require.NoError(t, err)
+	// Serialize and deserialize the record via an IPC stream
+	json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+	require.NoError(t, err)
+	// Compare the expected JSON with the actual JSON
+	// field = "value_0" but should be "value_1"
+	require.JSONEq(t, string(expectedJson), string(json))
+	require.NoError(t, ipcReader.Err())
+	ipcReader.Release()
+}
+
+// Encode and decode a record over a tuple of IPC writer and reader.
+// IPC writer and reader are the same from one call to another.
+func encodeDecodeIpcStream(t *testing.T,
+	record arrow.Record,
+	bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
+	bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) {
+
+	// Serialize the record via an ipc writer
+	if err := ipcWriter.Write(record); err != nil {
+		return nil, ipcReader, err
+	}
+	serializedRecord := bufWriter.Bytes()
+	bufWriter.Reset()
+
+	// Deserialize the record via an ipc reader
+	bufReader.Reset(serializedRecord)
+	if ipcReader == nil {
+		newIpcReader, err := ipc.NewReader(bufReader)
+		if err != nil {
+			return nil, newIpcReader, err
+		}
+		ipcReader = newIpcReader
+	}
+	ipcReader.Next()
+	record = ipcReader.Record()
+
+	// Return the decoded record as a json string
+	json, err := record.MarshalJSON()
+	if err != nil {
+		return nil, ipcReader, err
+	}
+	return json, ipcReader, nil
+}