You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Laurent Querel (Jira)" <ji...@apache.org> on 2022/11/13 17:46:00 UTC
[jira] [Created] (ARROW-18317) [Go] Dictionary issue with IPC stream
Laurent Querel created ARROW-18317:
--------------------------------------
Summary: [Go] Dictionary issue with IPC stream
Key: ARROW-18317
URL: https://issues.apache.org/jira/browse/ARROW-18317
Project: Apache Arrow
Issue Type: Bug
Components: Go
Affects Versions: 10.0.0
Reporter: Laurent Querel
Dictionaries are not correctly updated when sending a record on an IPC stream.
The following example creates a 1st record with a single field named "field" and initialized with the value "value_0. This record is then serialized with an ipc writer and deserialized with an ipc reader.
A second record is then created with the value "value_1". After serialization and deserialization, the expected value for the field is "value_1" but I get "value_0".
Based on a quick analysis via the debugger, I suspect an error in combining the dictionary from step 1 with the dictionary from step 2.
Below a code snippet to reproduce the issue.
{code:java}
// NOTE: Release methods are not managed in this test for simplicity.
func TestDictionary(t *testing.T) {
pool := memory.NewGoAllocator()
// A schema with a single dictionary field
schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
IndexType: arrow.PrimitiveTypes.Uint16,
ValueType: arrow.BinaryTypes.String,
Ordered: false,
}}}, nil)
// IPC writer and reader
var bufWriter bytes.Buffer
ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
bufReader := bytes.NewReader([]byte{})
var ipcReader *ipc.Reader
// Create a first record with field = "value_0"
record := CreateRecord(t, pool, schema, 0)
expectedJson, err := record.MarshalJSON()
require.NoError(t, err)
// Serialize and deserialize the record via an IPC stream
json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
require.NoError(t, err)
// Compare the expected JSON with the actual JSON
require.JSONEq(t, string(expectedJson), string(json))
// Create a second record with field = "value_1"
record = CreateRecord(t, pool, schema, 1)
expectedJson, err = record.MarshalJSON()
require.NoError(t, err)
// Serialize and deserialize the record via an IPC stream
json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
require.NoError(t, err)
// Compare the expected JSON with the actual JSON
// field = "value_0" but should be "value_1"
require.JSONEq(t, string(expectedJson), string(json))
}
func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema, value int) arrow.Record {
rb := array.NewRecordBuilder(pool, schema)
fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder)
err := fieldB.AppendString(fmt.Sprintf("value_%d", value))
if err != nil {
t.Fatal(err)
}
return rb.NewRecord()
}
func EncodeDecodeIpcStream(t *testing.T,
record arrow.Record,
bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) {
// Serialize the record via an ipc writer
if err := ipcWriter.Write(record); err != nil {
return nil, ipcReader, err
}
serializedRecord := bufWriter.Bytes()
bufWriter.Reset()
// Deserialize the record via an ipc reader
bufReader.Reset(serializedRecord)
if ipcReader == nil {
newIpcReader, err := ipc.NewReader(bufReader)
if err != nil {
return nil, newIpcReader, err
}
ipcReader = newIpcReader
}
ipcReader.Next()
record = ipcReader.Record()
// Return the decoded record as a json string
json, err := record.MarshalJSON()
if err != nil {
return nil, ipcReader, err
}
return json, ipcReader, nil
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)