You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Austin Cawley-Edwards <au...@gmail.com> on 2019/12/02 01:16:02 UTC

Deserialize list of JSON-encoded records with evolved Schema

Hi,

We are trying to encode a list of records into JSON with one schema and
then decode the list into Avro objects with a compatible schema. The schema
resolution between the two schemas works for single records, but the
deserialization fails when the read schema differs from the write.
Deserialization works, however, when the same schema is used for both.

When decoding, an exception is thrown:

org.apache.avro.AvroTypeException: Attempt to process a item-end when a int
was expected.
       org.apache.avro.io.parsing.Parser.advance(Parser.java:93)
       org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:139)
       org.apache.avro.io.JsonDecoder.arrayNext(JsonDecoder.java:360)

It seems like the decoder is not moving the proper number of bytes down to
read the next element.

We encode like so:

public static <T extends GenericRecord> String
toJSONArrayString(List<T> avroRecords, Schema schema) throws
IOException {

  if (avroRecords == null || avroRecords.isEmpty()) {
    return "[]";
  }

  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  Encoder encoder =
ENCODER_FACTORY.jsonEncoder(Schema.createArray(schema), baos);
  DatumWriter<T> datumWriter = avroRecords.get(0) instanceof SpecificRecord
      ? new SpecificDatumWriter<>(schema)
      : new GenericDatumWriter<>(schema);

  encoder.writeArrayStart();
  encoder.setItemCount(avroRecords.size());
  for (T record : avroRecords) {
    encoder.startItem();
    datumWriter.write(record, encoder);
  }
  encoder.writeArrayEnd();
  encoder.flush();

  return baos.toString();
}


And decode similarly:

public static <T extends GenericRecord> List<T>
fromJSONArrayString(String jsonArrayString, Schema writeSchema, Schema
readSchema) throws IOException {
  Schema readArrSchema = Schema.createArray(readSchema);
  Decoder decoder = DECODER_FACTORY.jsonDecoder(readArrSchema, jsonArrayString);
  DatumReader<T> datumReader;
  if (writeSchema.equals(readSchema)) {
    datumReader = new SpecificDatumReader<>(readSchema);
  } else {
    datumReader = new SpecificDatumReader<>(writeSchema, readSchema);
  }

  List<T> avroRecords = new ArrayList<>();
  for (long i = decoder.readArrayStart(); i != 0; i = decoder.arrayNext()) {
    for (long j = 0; j < i; j++) {
      avroRecords.add(datumReader.read(null, decoder));
    }
  }

  return avroRecords;
}



Our two schemas look like:

{
  "type": "record",
  "name": "TestRecordV1",
  "fields": [
    {
      "name": "text",
      "type": "string"
    }
  ]
}

{
  "type": "record",
  "name": "TestRecordV2",
  "fields": [
    {
      "name": "text",
      "type": "string"
    },
    {
      "name": "number",
      "type": "int",
      "default": 0
    }
  ]
}



Is there something simple we are missing or is it not possible to do schema
resolution dynamically on an entire array?

Thank you!
Austin

Re: Deserialize list of JSON-encoded records with evolved Schema

Posted by Austin Cawley-Edwards <au...@gmail.com>.
Hi Zoltan,

Yes, that works perfectly for me 🤦‍♂.

Thank you!
Austin

On Mon, Dec 2, 2019 at 8:44 AM Zoltan Farkas <zo...@yahoo.com> wrote:

> The error suggests that you are attempt to parse a message encoded with
> TestRecordV1 and you use TestRecordV2 as writer schema instead
> of TestRecordV1.
>
>
> make sure when you de-serialize an TestRecordV1 array into a TestRecordV2
> array, you initialize your Json decoder with the writer schema not the
> reader one:
>
>  Decoder decoder = DECODER_FACTORY.jsonDecoder(readArrSchema writeArrSchema, jsonArrayString);
>
>
> hope it helps.
>
> —Z
>
>
>
> On Dec 1, 2019, at 8:16 PM, Austin Cawley-Edwards <au...@gmail.com>
> wrote:
>
> Hi,
>
> We are trying to encode a list of records into JSON with one schema and
> then decode the list into Avro objects with a compatible schema. The schema
> resolution between the two schemas works for single records, but the
> deserialization fails when the read schema differs from the write.
> Deserialization works, however, when the same schema is used for both.
>
> When decoding, an exception is thrown:
>
> org.apache.avro.AvroTypeException: Attempt to process a item-end when a
> int was expected.
>        org.apache.avro.io.parsing.Parser.advance(Parser.java:93)
>        org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:139)
>        org.apache.avro.io.JsonDecoder.arrayNext(JsonDecoder.java:360)
>
> It seems like the decoder is not moving the proper number of bytes down to
> read the next element.
>
> We encode like so:
>
> public static <T extends GenericRecord> String toJSONArrayString(List<T> avroRecords, Schema schema) throws IOException {
>
>   if (avroRecords == null || avroRecords.isEmpty()) {
>     return "[]";
>   }
>
>   ByteArrayOutputStream baos = new ByteArrayOutputStream();
>   Encoder encoder = ENCODER_FACTORY.jsonEncoder(Schema.createArray(schema), baos);
>   DatumWriter<T> datumWriter = avroRecords.get(0) instanceof SpecificRecord
>       ? new SpecificDatumWriter<>(schema)
>       : new GenericDatumWriter<>(schema);
>
>   encoder.writeArrayStart();
>   encoder.setItemCount(avroRecords.size());
>   for (T record : avroRecords) {
>     encoder.startItem();
>     datumWriter.write(record, encoder);
>   }
>   encoder.writeArrayEnd();
>   encoder.flush();
>
>   return baos.toString();
> }
>
>
> And decode similarly:
>
> public static <T extends GenericRecord> List<T> fromJSONArrayString(String jsonArrayString, Schema writeSchema, Schema readSchema) throws IOException {
>   Schema readArrSchema = Schema.createArray(readSchema);
>   Decoder decoder = DECODER_FACTORY.jsonDecoder(readArrSchema, jsonArrayString);
>   DatumReader<T> datumReader;
>   if (writeSchema.equals(readSchema)) {
>     datumReader = new SpecificDatumReader<>(readSchema);
>   } else {
>     datumReader = new SpecificDatumReader<>(writeSchema, readSchema);
>   }
>
>   List<T> avroRecords = new ArrayList<>();
>   for (long i = decoder.readArrayStart(); i != 0; i = decoder.arrayNext()) {
>     for (long j = 0; j < i; j++) {
>       avroRecords.add(datumReader.read(null, decoder));
>     }
>   }
>
>   return avroRecords;
> }
>
>
>
> Our two schemas look like:
>
> {
>   "type": "record",
>   "name": "TestRecordV1",
>   "fields": [
>     {
>       "name": "text",
>       "type": "string"
>     }
>   ]
> }
>
> {
>   "type": "record",
>   "name": "TestRecordV2",
>   "fields": [
>     {
>       "name": "text",
>       "type": "string"
>     },
>     {
>       "name": "number",
>       "type": "int",
>       "default": 0
>     }
>   ]
> }
>
>
>
> Is there something simple we are missing or is it not possible to do
> schema resolution dynamically on an entire array?
>
> Thank you!
> Austin
>
>
>
>

Re: Deserialize list of JSON-encoded records with evolved Schema

Posted by Zoltan Farkas <zo...@yahoo.com>.
The error suggests that you are attempt to parse a message encoded with TestRecordV1 and you use TestRecordV2 as writer schema instead of TestRecordV1.


make sure when you de-serialize an TestRecordV1 array into a TestRecordV2 array, you initialize your Json decoder with the writer schema not the reader one:

>  Decoder decoder = DECODER_FACTORY.jsonDecoder(readArrSchema writeArrSchema, jsonArrayString);

hope it helps.

—Z



> On Dec 1, 2019, at 8:16 PM, Austin Cawley-Edwards <au...@gmail.com> wrote:
> 
> Hi,
> 
> We are trying to encode a list of records into JSON with one schema and then decode the list into Avro objects with a compatible schema. The schema resolution between the two schemas works for single records, but the deserialization fails when the read schema differs from the write. Deserialization works, however, when the same schema is used for both.
> 
> When decoding, an exception is thrown:
> 
> org.apache.avro.AvroTypeException: Attempt to process a item-end when a int was expected.
>        org.apache.avro.io.parsing.Parser.advance(Parser.java:93)
>        org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:139)
>        org.apache.avro.io.JsonDecoder.arrayNext(JsonDecoder.java:360)
> 
> It seems like the decoder is not moving the proper number of bytes down to read the next element.
> 
> We encode like so:
> 
> public static <T extends GenericRecord> String toJSONArrayString(List<T> avroRecords, Schema schema) throws IOException {
> 
>   if (avroRecords == null || avroRecords.isEmpty()) {
>     return "[]";
>   }
> 
>   ByteArrayOutputStream baos = new ByteArrayOutputStream();
>   Encoder encoder = ENCODER_FACTORY.jsonEncoder(Schema.createArray(schema), baos);
>   DatumWriter<T> datumWriter = avroRecords.get(0) instanceof SpecificRecord
>       ? new SpecificDatumWriter<>(schema)
>       : new GenericDatumWriter<>(schema);
> 
>   encoder.writeArrayStart();
>   encoder.setItemCount(avroRecords.size());
>   for (T record : avroRecords) {
>     encoder.startItem();
>     datumWriter.write(record, encoder);
>   }
>   encoder.writeArrayEnd();
>   encoder.flush();
> 
>   return baos.toString();
> }
> 
> And decode similarly:
> public static <T extends GenericRecord> List<T> fromJSONArrayString(String jsonArrayString, Schema writeSchema, Schema readSchema) throws IOException {
>   Schema readArrSchema = Schema.createArray(readSchema);
>   Decoder decoder = DECODER_FACTORY.jsonDecoder(readArrSchema, jsonArrayString);
>   DatumReader<T> datumReader;
>   if (writeSchema.equals(readSchema)) {
>     datumReader = new SpecificDatumReader<>(readSchema);
>   } else {
>     datumReader = new SpecificDatumReader<>(writeSchema, readSchema);
>   }
> 
>   List<T> avroRecords = new ArrayList<>();
>   for (long i = decoder.readArrayStart(); i != 0; i = decoder.arrayNext()) {
>     for (long j = 0; j < i; j++) {
>       avroRecords.add(datumReader.read(null, decoder));
>     }
>   }
> 
>   return avroRecords;
> }
> 
> 
> Our two schemas look like:
> {
>   "type": "record",
>   "name": "TestRecordV1",
>   "fields": [
>     {
>       "name": "text",
>       "type": "string"
>     }
>   ]
> }
> {
>   "type": "record",
>   "name": "TestRecordV2",
>   "fields": [
>     {
>       "name": "text",
>       "type": "string"
>     },
>     {
>       "name": "number",
>       "type": "int",
>       "default": 0
>     }
>   ]
> }
> 
> 
> Is there something simple we are missing or is it not possible to do schema resolution dynamically on an entire array?
> 
> Thank you!
> Austin
> 
>