You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/11/15 23:50:29 UTC
[arrow] 01/03: ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636)
This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 60e1d81ed9c7ff409d96f1c658698df19ce79748
Author: Matt Topol <zo...@gmail.com>
AuthorDate: Tue Nov 15 10:55:56 2022 -0500
ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636)
Fix dictionary replacement for IPC streams. Currently they incorrectly get concatenated together instead of replaced when not using deltas. This will properly replace dictionaries when encountering a non-delta dictionary message.
Authored-by: Matt Topol <zo...@gmail.com>
Signed-off-by: Matt Topol <zo...@gmail.com>
---
go/arrow/internal/dictutils/dict.go | 10 +++-
go/arrow/ipc/ipc_test.go | 94 +++++++++++++++++++++++++++++++++++++
2 files changed, 103 insertions(+), 1 deletion(-)
diff --git a/go/arrow/internal/dictutils/dict.go b/go/arrow/internal/dictutils/dict.go
index 9dd17c6540..561923f626 100644
--- a/go/arrow/internal/dictutils/dict.go
+++ b/go/arrow/internal/dictutils/dict.go
@@ -344,10 +344,18 @@ func (memo *Memo) AddDelta(id int64, v arrow.ArrayData) {
memo.id2dict[id] = append(d, v)
}
+// AddOrReplace puts the provided dictionary into the memo table. If it
+// already exists, then the new data will replace it. Otherwise it is added
+// to the memo table.
func (memo *Memo) AddOrReplace(id int64, v arrow.ArrayData) bool {
d, ok := memo.id2dict[id]
if ok {
- d = append(d, v)
+ // replace the dictionary and release any existing ones
+ for _, dict := range d {
+ dict.Release()
+ }
+ d[0] = v
+ d = d[:1]
} else {
d = []arrow.ArrayData{v}
}
diff --git a/go/arrow/ipc/ipc_test.go b/go/arrow/ipc/ipc_test.go
index 2f0816c0d0..d6d0abf225 100644
--- a/go/arrow/ipc/ipc_test.go
+++ b/go/arrow/ipc/ipc_test.go
@@ -330,3 +330,97 @@ func TestIPCTable(t *testing.T) {
n++
}
}
+
+// ARROW-18317
+func TestDictionary(t *testing.T) {
+ pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer pool.AssertSize(t, 0)
+
+ // A schema with a single dictionary field
+ schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
+ IndexType: arrow.PrimitiveTypes.Uint16,
+ ValueType: arrow.BinaryTypes.String,
+ Ordered: false,
+ }}}, nil)
+
+ // IPC writer and reader
+ var bufWriter bytes.Buffer
+ ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
+ defer ipcWriter.Close()
+
+ bufReader := bytes.NewReader([]byte{})
+ var ipcReader *ipc.Reader
+
+ bldr := array.NewBuilder(pool, schema.Field(0).Type)
+ defer bldr.Release()
+ require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`)))
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+ // Create a first record with field = "value_0"
+ record := array.NewRecord(schema, []arrow.Array{arr}, 1)
+ defer record.Release()
+
+ expectedJson, err := record.MarshalJSON()
+ require.NoError(t, err)
+ // Serialize and deserialize the record via an IPC stream
+ json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+ require.NoError(t, err)
+ // Compare the expected JSON with the actual JSON
+ require.JSONEq(t, string(expectedJson), string(json))
+
+ // Create a second record with field = "value_1"
+ require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`)))
+ arr = bldr.NewArray()
+ defer arr.Release()
+ record = array.NewRecord(schema, []arrow.Array{arr}, 1)
+
+ // record, _, err = array.RecordFromJSON(pool, schema, strings.NewReader(`[{"field": ["value_1"]}]`))
+ // require.NoError(t, err)
+ defer record.Release()
+
+ expectedJson, err = record.MarshalJSON()
+ require.NoError(t, err)
+ // Serialize and deserialize the record via an IPC stream
+ json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
+ require.NoError(t, err)
+ // Compare the expected JSON with the actual JSON
+ // field = "value_0" but should be "value_1"
+ require.JSONEq(t, string(expectedJson), string(json))
+ require.NoError(t, ipcReader.Err())
+ ipcReader.Release()
+}
+
+// Encode and decode a record over a tuple of IPC writer and reader.
+// IPC writer and reader are the same from one call to another.
+func encodeDecodeIpcStream(t *testing.T,
+ record arrow.Record,
+ bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
+ bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) {
+
+ // Serialize the record via an ipc writer
+ if err := ipcWriter.Write(record); err != nil {
+ return nil, ipcReader, err
+ }
+ serializedRecord := bufWriter.Bytes()
+ bufWriter.Reset()
+
+ // Deserialize the record via an ipc reader
+ bufReader.Reset(serializedRecord)
+ if ipcReader == nil {
+ newIpcReader, err := ipc.NewReader(bufReader)
+ if err != nil {
+ return nil, newIpcReader, err
+ }
+ ipcReader = newIpcReader
+ }
+ ipcReader.Next()
+ record = ipcReader.Record()
+
+ // Return the decoded record as a json string
+ json, err := record.MarshalJSON()
+ if err != nil {
+ return nil, ipcReader, err
+ }
+ return json, ipcReader, nil
+}