You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Laurent Querel (Jira)" <ji...@apache.org> on 2022/11/13 17:46:00 UTC

[jira] [Created] (ARROW-18317) [Go] Dictionary issue with IPC stream

Laurent Querel created ARROW-18317:
--------------------------------------

             Summary: [Go] Dictionary issue with IPC stream
                 Key: ARROW-18317
                 URL: https://issues.apache.org/jira/browse/ARROW-18317
             Project: Apache Arrow
          Issue Type: Bug
          Components: Go
    Affects Versions: 10.0.0
            Reporter: Laurent Querel


Dictionaries are not correctly updated when sending a record on an IPC stream. 

The following example creates a 1st record with a single field named "field" and initialized with the value "value_0. This record is then serialized with an ipc writer and deserialized with an ipc reader.

A second record is then created with the value "value_1". After serialization and deserialization, the expected value for the field is "value_1" but I get "value_0". 

Based on a quick analysis via the debugger, I suspect an error in combining the dictionary from step 1 with the dictionary from step 2.

Below a code snippet to reproduce the issue.
{code:java}
// NOTE: Release methods are not managed in this test for simplicity.
func TestDictionary(t *testing.T) {
   pool := memory.NewGoAllocator()
   // A schema with a single dictionary field
   schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
      IndexType: arrow.PrimitiveTypes.Uint16,
      ValueType: arrow.BinaryTypes.String,
      Ordered:   false,
   }}}, nil)
   // IPC writer and reader
   var bufWriter bytes.Buffer
   ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
   bufReader := bytes.NewReader([]byte{})
   var ipcReader *ipc.Reader

   // Create a first record with field = "value_0" 
   record := CreateRecord(t, pool, schema, 0)
   expectedJson, err := record.MarshalJSON()
   require.NoError(t, err)
   // Serialize and deserialize the record via an IPC stream
   json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
   require.NoError(t, err)
   // Compare the expected JSON with the actual JSON
   require.JSONEq(t, string(expectedJson), string(json))

   // Create a second record with field = "value_1" 
   record = CreateRecord(t, pool, schema, 1)
   expectedJson, err = record.MarshalJSON()
   require.NoError(t, err)
   // Serialize and deserialize the record via an IPC stream
   json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
   require.NoError(t, err)
   // Compare the expected JSON with the actual JSON
   // field = "value_0" but should be "value_1"
   require.JSONEq(t, string(expectedJson), string(json))
}

func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema, value int) arrow.Record {
   rb := array.NewRecordBuilder(pool, schema)
   fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder)
   err := fieldB.AppendString(fmt.Sprintf("value_%d", value))
   if err != nil {
      t.Fatal(err)
   }
   return rb.NewRecord()
}

func EncodeDecodeIpcStream(t *testing.T,
   record arrow.Record,
   bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
   bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) {

   // Serialize the record via an ipc writer
   if err := ipcWriter.Write(record); err != nil {
      return nil, ipcReader, err
   }
   serializedRecord := bufWriter.Bytes()
   bufWriter.Reset()

   // Deserialize the record via an ipc reader
   bufReader.Reset(serializedRecord)
   if ipcReader == nil {
      newIpcReader, err := ipc.NewReader(bufReader)
      if err != nil {
         return nil, newIpcReader, err
      }
      ipcReader = newIpcReader
   }
   ipcReader.Next()
   record = ipcReader.Record()

   // Return the decoded record as a json string
   json, err := record.MarshalJSON()
   if err != nil {
      return nil, ipcReader, err
   }
   return json, ipcReader, nil
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)