You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Martin Mucha <al...@gmail.com> on 2019/08/01 13:30:11 UTC

Re: AVRO schema evolution: adding optional column with default fails deserialization

Hi,

just one more question, not strictly related to the subject.

Initially I though I'd be OK with using some initial version of schema in
place of writer schema. That works, but all columns from schema older than
this initial one would be just ignored. So I need to know EXACTLY the
schema, which writer used. I know, that avro messages contains either full
schema or at least it's ID. Can you point me to the documentation, where
this is discussed? So in my deserializer I have byte[] as a input, from
which I need to get the schema information first, in order to be able to
deserialize the record. I really do not know how to do that, I'm pretty
sure I never saw this anywhere, and I cannot find it anywhere. But in
principle it must be possible, since reader need not necessarily have any
control of which schema writer used.

thanks a lot.
M.

út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <al...@gmail.com> napsal:

> Thank you very much for in depth answer. I understand how it works now
> better, will test it shortly.
> Thank you for your time.
>
> Martin.
>
> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <ry...@skraba.com> napsal:
>
>> Hello!  It's the same issue in your example code as allegro, even with
>> the SpecificDatumReader.
>>
>> This line: datumReader = new SpecificDatumReader<>(schema)
>> should be: datumReader = new SpecificDatumReader<>(originalSchema, schema)
>>
>> In Avro, the original schema is commonly known as the writer schema
>> (the instance that originally wrote the binary data).  Schema
>> evolution applies when you are using the constructor of the
>> SpecificDatumReader that takes *both* reader and writer schemas.
>>
>> As a concrete example, if your original schema was:
>>
>> {
>>   "type": "record",
>>   "name": "Simple",
>>   "fields": [
>>     {"name": "id", "type": "int"},
>>     {"name": "name","type": "string"}
>>   ]
>> }
>>
>> And you added a field:
>>
>> {
>>   "type": "record",
>>   "name": "SimpleV2",
>>   "fields": [
>>     {"name": "id", "type": "int"},
>>     {"name": "name", "type": "string"},
>>     {"name": "description","type": ["null", "string"]}
>>   ]
>> }
>>
>> You could do the following safely, assuming that Simple and SimpleV2
>> classes are generated from the avro-maven-plugin:
>>
>> @Test
>> public void testSerializeDeserializeEvolution() throws IOException {
>>   // Write a Simple v1 to bytes using your exact method.
>>   byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false);
>>
>>   // Read as Simple v2, same as your method but with the writer and
>> reader schema.
>>   DatumReader<SimpleV2> datumReader =
>>       new SpecificDatumReader<>(Simple.getClassSchema(),
>> SimpleV2.getClassSchema());
>>   Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes, null);
>>   SimpleV2 v2 = datumReader.read(null, decoder);
>>
>>   assertThat(v2.getId(), is(1));
>>   assertThat(v2.getName(), is(new Utf8("name1")));
>>   assertThat(v2.getDescription(), nullValue());
>> }
>>
>> This demonstrates with two different schemas and SpecificRecords in
>> the same test, but the same principle applies if it's the same record
>> that has evolved -- you need to know the original schema that wrote
>> the data in order to apply the schema that you're now using for
>> reading.
>>
>> I hope this clarifies what you are looking for!
>>
>> All my best, Ryan
>>
>>
>>
>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <al...@gmail.com> wrote:
>> >
>> > Thanks for answer.
>> >
>> > Actually I have exactly the same behavior with avro 1.9.0 and following
>> deserializer in our other app, which uses strictly avro codebase, and
>> failing with same exceptions. So lets leave "allegro" library and lots of
>> other tools out of it in our discussion.
>> > I can use whichever aproach. All I need is single way, where I can
>> deserialize byte[] into class generated by avro-maven-plugin, and which
>> will respect documentation regarding schema evolution. Currently we're
>> using following deserializer and serializer, and these does not work when
>> it comes to schema evolution. What is the correct way to serialize and
>> deserializer avro data?
>> >
>> > I probably don't understand your mention about GenericRecord or
>> GenericDatumReader. I tried to use GenericDatumReader in deserializer
>> below, but then it seems I got back just GenericData$Record instance, which
>> I can use then to access array of instances, which is not what I'm looking
>> for(IIUC), since in that case I could have just use plain old JSON and
>> deserialize it using jackson having no schema evolution problems at all. If
>> that's correct, I'd rather stick to SpecificDatumReader, and somehow fix it
>> if possible.
>> >
>> > What can be done? Or how schema evolution is intended to be used? I
>> found a lots of question searching for this answer.
>> >
>> > thanks!
>> > Martin.
>> >
>> > deserializer:
>> >
>> > public static <T extends SpecificRecordBase> T deserialize(Class<T>
>> targetType,
>> >                                                                byte[]
>> data,
>> >                                                                boolean
>> useBinaryDecoder) {
>> >         try {
>> >             if (data == null) {
>> >                 return null;
>> >             }
>> >
>> >             log.trace("data='{}'",
>> DatatypeConverter.printHexBinary(data));
>> >
>> >             Schema schema = targetType.newInstance().getSchema();
>> >             DatumReader<GenericRecord> datumReader = new
>> SpecificDatumReader<>(schema);
>> >             Decoder decoder = useBinaryDecoder
>> >                     ? DecoderFactory.get().binaryDecoder(data, null)
>> >                     : DecoderFactory.get().jsonDecoder(schema, new
>> String(data));
>> >
>> >             T result = targetType.cast(datumReader.read(null, decoder));
>> >             log.trace("deserialized data='{}'", result);
>> >             return result;
>> >         } catch (Exception ex) {
>> >             throw new SerializationException("Error deserializing
>> data", ex);
>> >         }
>> >     }
>> >
>> > serializer:
>> > public static <T extends SpecificRecordBase> byte[] serialize(T data,
>> boolean useBinaryDecoder, boolean pretty) {
>> >         try {
>> >             if (data == null) {
>> >                 return new byte[0];
>> >             }
>> >
>> >             log.debug("data='{}'", data);
>> >             Schema schema = data.getSchema();
>> >             ByteArrayOutputStream byteArrayOutputStream = new
>> ByteArrayOutputStream();
>> >             Encoder binaryEncoder = useBinaryDecoder
>> >                     ?
>> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>> >                     : EncoderFactory.get().jsonEncoder(schema,
>> byteArrayOutputStream, pretty);
>> >
>> >             DatumWriter<GenericRecord> datumWriter = new
>> GenericDatumWriter<>(schema);
>> >             datumWriter.write(data, binaryEncoder);
>> >
>> >             binaryEncoder.flush();
>> >             byteArrayOutputStream.close();
>> >
>> >             byte[] result = byteArrayOutputStream.toByteArray();
>> >             log.debug("serialized data='{}'",
>> DatatypeConverter.printHexBinary(result));
>> >             return result;
>> >         } catch (IOException ex) {
>> >             throw new SerializationException(
>> >                     "Can't serialize data='" + data, ex);
>> >         }
>> >     }
>> >
>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <ry...@skraba.com> napsal:
>> >>
>> >> Hello!  Schema evolution relies on both the writer and reader schemas
>> >> being available.
>> >>
>> >> It looks like the allegro tool you are using is using the
>> >> GenericDatumReader that assumes the reader and writer schema are the
>> >> same:
>> >>
>> >>
>> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83
>> >>
>> >> I do not believe that the "default" value is taken into account for
>> >> data that is strictly missing from the binary input, just when a field
>> >> is known to be in the reader schema but missing from the original
>> >> writer.
>> >>
>> >> You may have more luck reading the GenericRecord with a
>> >> GenericDatumReader with both schemas, and using the
>> >> `convertToJson(record)`.
>> >>
>> >> I hope this is useful -- Ryan
>> >>
>> >>
>> >>
>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <al...@gmail.com>
>> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I've got some issues/misunderstanding of AVRO schema evolution.
>> >> >
>> >> > When reading through avro documentation, for example [1], I
>> understood, that schema evolution is supported, and if I added column with
>> specified default, it should be backwards compatible (and even forward when
>> I remove it again). Sounds great, so I added column defined as:
>> >> >
>> >> >         {
>> >> >           "name": "newColumn",
>> >> >           "type": ["null","string"],
>> >> >           "default": null,
>> >> >           "doc": "something wrong"
>> >> >         }
>> >> >
>> >> > and try to consumer some topic having this schema from beginning, it
>> fails with message:
>> >> >
>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
>> >> >     at
>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>> >> >     at org.apache.avro.io
>> .ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>> >> >     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>> >> >     at org.apache.avro.io
>> .ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> >> >     at
>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>> >> > to give a little bit more information. Avro schema defines one top
>> level type, having 2 fields. String describing type of message, and union
>> of N types. All N-1, non-modified types can be read, but one updated with
>> optional, default-having column cannot be read. I'm not sure if this design
>> is strictly speaking correct, but that's not the point (feel free to
>> criticise and recommend better approach!). I'm after schema evolution,
>> which seems not to be working.
>> >> >
>> >> >
>> >> > And if we alter type definition to:
>> >> >
>> >> > "type": "string",
>> >> > "default": ""
>> >> > it still does not work and generated error is:
>> >> >
>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data.
>> Length is negative: -1
>> >> >     at org.apache.avro.io
>> .BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>> >> >     at org.apache.avro.io
>> .BinaryDecoder.readString(BinaryDecoder.java:263)
>> >> >     at org.apache.avro.io
>> .ResolvingDecoder.readString(ResolvingDecoder.java:201)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> >> >     at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> >> >     at
>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>> >> >
>> >> > Am I doing something wrong?
>> >> >
>> >> > thanks,
>> >> > Martin.
>> >> >
>> >> > [1]
>> https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules
>>
>

Re: Schema parses in C# Avro lib but not in Kafka Schema registry (assume that it is the java lib)

Posted by Patrick Farry <pa...@gmail.com>.
Thanks Ryan and Bryan.

Turned out it was user error :(. One of our guys made a tweak to the schema
before trying to upload it.

On Tue, Aug 6, 2019, 2:20 AM Ryan Skraba <ry...@skraba.com> wrote:

> Hello!  I successfully managed to load the schema into the confluent
> schema-registry version 5.3.0 (containing Avro 1.8.1), using the docker
> quickstart [1] and the command line:
>
> I just tested that the load worked -- I didn't try reading or writing
> binary.
>
> # Save the schema into registry.
> curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"
> --data "@PackageCreateInformationSchema.json"
> http://localhost:8081/subjects/PackageCreateInformation/versions
> # Fetch and check the schema from the registry.
> curl -X GET
> http://localhost:8081/subjects/PackageCreateInformation/versions/1
>
> The PackageCreateInformationSchema.json file is a bit weird, in the format
> {"schema": "your_schema_string_representation"} which takes a lot of
> escaping quotes (the exact contents I used follows if you want to
> reproduce).
>
> How are you loading the schema, and what version of the schema-registry
> are you using?  Perhaps we can narrow it down to a specific avro version
> and see if we can reproduce it outside of the registry.
>
> Best regards, Ryan
>
> [1]
> https://docs.confluent.io/current/quickstart/cos-docker-quickstart.html
>
> PackageCreateInformationSchema.json
>
>
> {"schema":"{\"type\":\"record\",\"name\":\"PackageCreateInformation\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"sendSms\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"trackingId\",\"type\":\"string\"},{\"name\":\"packageId\",\"type\":\"string\"},{\"name\":\"clientFacilityId\",\"type\":\"string\"},{\"name\":\"sortCode\",\"type\":\"string\"},{\"name\":\"ngsFacilityId\",\"type\":\"string\"},{\"name\":\"merchantId\",\"type\":\"string\"},{\"name\":\"ClassOfService\",\"type\":{\"type\":\"enum\",\"name\":\"ClassesOfService\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Ground\",\"Express\",\"FirstClass\",\"Priority\",\"BPM\",\"PBD3D\"]}},{\"name\":\"carrier\",\"type\":{\"type\":\"enum\",\"name\":\"Carriers\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"USPS\",\"DHL\",\"UPS\",\"FedEx\"]}},{\"name\":\"carrierClassOfService\",\"type\":{\"type\":\"enum\",\"name\":\"CarrierClassesOfService\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"ParcelSelect\",\"ParcelSelectLightweight\",\"FirstClass\",\"Priority\",\"BPM\"]}},{\"name\":\"weight\",\"type\":{\"type\":\"record\",\"name\":\"Weight\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"unitOfMeasure\",\"type\":{\"type\":\"enum\",\"name\":\"WeightUnitOfMeasure\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Pounds\",\"Ounces\",\"Kilograms\",\"Grams\"]}},{\"name\":\"measurmentValue\",\"type\":\"double\"}]}},{\"name\":\"dimensions\",\"type\":{\"type\":\"record\",\"name\":\"Dimensions\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"length\",\"type\":{\"type\":\"record\",\"name\":\"Length\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"unitOfMeasure\",\"type\":{\"type\":\"enum\",\"name\":\"LengthUnitOfMeasure\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Inches\",\"Meters\",\"Centimeters\"]}},{\"name\":\"measurmentValue\",\"type\":\"double\"}]}},{\"name\":\"width\",\"type\":\"Length\"},{\"name\":\"height\",\"type\":\"Length\"},{\"name\":\"girth\",\"type\":\"Length\"},{\"name\":\"isRectangular\",\"type\":\"boolean\"}]}},{\"name\":\"shipToAddress\",\"type\":{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"attention\",\"type\":[\"null\",\"string\"]},{\"name\":\"address1\",\"type\":\"string\"},{\"name\":\"address2\",\"type\":[\"null\",\"string\"]},{\"name\":\"city\",\"type\":[\"null\",\"string\"]},{\"name\":\"stateOrProvince\",\"type\":[\"null\",\"string\"]},{\"name\":\"country\",\"type\":[\"null\",\"string\"]},{\"name\":\"isResidential\",\"type\":[\"null\",\"boolean\"]}]}},{\"name\":\"returnAddress\",\"type\":\"Address\"},{\"name\":\"ReferenceNumbers\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"ReferenceNumbers\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"qualifier\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}}]},{\"name\":\"hazmatClasses\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"enum\",\"name\":\"Hazmat\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Explosives\",\"Gases\",\"FlammableCombustibleLiquids\",\"FlammableSolids\",\"OxidizingSubstancesOrganicPeroxides\",\"ToxicSubstancesAndInfectiousSubstances\",\"RadioactiveMaterial\",\"Corrosives\",\"ORMD\",\"ConsumerCommodities\",\"MiscellaneousHazardousMaterials\"]}}]},{\"name\":\"Subscription\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Subscription\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"sendSms\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"sendEmail\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"events\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"enum\",\"name\":\"Events\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Delivered\",\"InTransit\"]}}},{\"name\":\"emailAddress\",\"type\":\"string\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}]}]}"}
>
>
>
> On Tue, Aug 6, 2019 at 2:50 AM Brian Lachniet <bl...@gmail.com> wrote:
>
>> I don't see anything obviously wrong. I tried generating Java code from
>> this schema with *avro-tools *and had no problems.
>>
>> Unfortunately, I haven't had the opportunity to use the Confluent Schema
>> Registry yet. This comment
>> <https://github.com/confluentinc/schema-registry/issues/214#issuecomment-437514914>
>> seems to suggest that you might find more detailed error messages in the
>> schema registry server logs. Have you tried looking there?
>>
>> On Mon, Aug 5, 2019 at 7:10 PM Patrick Farry <pa...@gmail.com>
>> wrote:
>>
>>> The schema was generated from a protocol using the C# 1.9 code. When we
>>> try to load it into the Kafka schema registry we get an error that the
>>> schema is invalid.
>>>
>>> The only feedback we get is “Length is not valid”. If we remove Length
>>> then the error says that “Address is not valid” - so presumably there is
>>> something wrong referencing previously defined named types.
>>>
>>> Is there anything obviously wrong?
>>>
>>>
>>> {
>>>
>>> "type": "record",
>>>
>>>   "name": "PackageCreateInformation",
>>>
>>>   "namespace": "com.pb.fdr.delivery",
>>>
>>>   "fields": [
>>>
>>>     {
>>>
>>>       "name": "sendSms",
>>>
>>>       "type": [
>>>
>>>         "null",
>>>
>>>         "boolean"
>>>
>>>       ]
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "trackingId",
>>>
>>>       "type": "string"
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "packageId",
>>>
>>>       "type": "string"
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "clientFacilityId",
>>>
>>>       "type": "string"
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "sortCode",
>>>
>>>       "type": "string"
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "ngsFacilityId",
>>>
>>>       "type": "string"
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "merchantId",
>>>
>>>       "type": "string"
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "ClassOfService",
>>>
>>>       "type": {
>>>
>>>         "type": "enum",
>>>
>>>         "name": "ClassesOfService",
>>>
>>>         "namespace": "com.pb.fdr.delivery",
>>>
>>>         "symbols": [
>>>
>>>           "Ground",
>>>
>>>           "Express",
>>>
>>>           "FirstClass",
>>>
>>>           "Priority",
>>>
>>>           "BPM",
>>>
>>>           "PBD3D"
>>>
>>>         ]
>>>
>>>       }
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "carrier",
>>>
>>>       "type": {
>>>
>>>         "type": "enum",
>>>
>>>         "name": "Carriers",
>>>
>>>         "namespace": "com.pb.fdr.delivery",
>>>
>>>         "symbols": [
>>>
>>>           "USPS",
>>>
>>>           "DHL",
>>>
>>>           "UPS",
>>>
>>>           "FedEx"
>>>
>>>         ]
>>>
>>>       }
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "carrierClassOfService",
>>>
>>>       "type": {
>>>
>>>         "type": "enum",
>>>
>>>         "name": "CarrierClassesOfService",
>>>
>>>         "namespace": "com.pb.fdr.delivery",
>>>
>>>         "symbols": [
>>>
>>>           "ParcelSelect",
>>>
>>>           "ParcelSelectLightweight",
>>>
>>>           "FirstClass",
>>>
>>>           "Priority",
>>>
>>>           "BPM"
>>>
>>>         ]
>>>
>>>       }
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "weight",
>>>
>>>       "type": {
>>>
>>>         "type": "record",
>>>
>>>         "name": "Weight",
>>>
>>>         "namespace": "com.pb.fdr.delivery",
>>>
>>>         "fields": [
>>>
>>>           {
>>>
>>>             "name": "unitOfMeasure",
>>>
>>>             "type": {
>>>
>>>               "type": "enum",
>>>
>>>               "name": "WeightUnitOfMeasure",
>>>
>>>               "namespace": "com.pb.fdr.delivery",
>>>
>>>               "symbols": [
>>>
>>>                 "Pounds",
>>>
>>>                 "Ounces",
>>>
>>>                 "Kilograms",
>>>
>>>                 "Grams"
>>>
>>>               ]
>>>
>>>             }
>>>
>>>           },
>>>
>>>           {
>>>
>>>             "name": "measurmentValue",
>>>
>>>             "type": "double"
>>>
>>>           }
>>>
>>>         ]
>>>
>>>       }
>>>
>>>     },
>>>
>>>     {
>>>
>>>       "name": "dimensions",
>>>
>>>       "type": {
>>>
>>>         "type": "record",
>>>
>>>         "name": "Dimensions",
>>>
>>>         "namespace": "com.pb.fdr.delivery",
>>>
>>>         "fields": [
>>>
>>>           {
>>>
>>>             "name": "length",
>>>
>>>             "type": {
>>>
>>>               "type": "record",
>>>
>>>               "name": "Length",
>>>
>>>               "namespace": "com.pb.fdr.delivery",
>>>
>>>               "fields": [
>>>
>>>                 {
>>>
>>>                   "name": "unitOfMeasure",
>>>
>>>                   "type": {
>>>
>>>                     "type": "enum",
>>>
>>>                     "name": "LengthUnitOfMeasure",
>>>
>>>                     "namespace": "com.pb.fdr.delivery",
>>>
>>>                     "symbols": [
>>>
>>>                       "Inches",
>>>
>>>                       "Meters",
>>>
>>>                       "Centimeters"
>>>
>>>                     ]
>>>
>>>                   }
>>>
>>>                 },
>>>
>>>                 {
>>>
>>>                   "name": "measurmentValue",
>>>
>>>                   "type": "double"
>>>
>>>                 }
>>>
>>>               ]
>>>
>>>             }
>>>
>>>           },
>>>
>>>           {
>>>
>>>             "name": "width",
>>>
>>>             "type": "Length"
>>>
>>>           },
>>>
>>>           {
>>>
>>>             "name": "height",
>>>
>>>             "type": "Length"
>>>
>>>           },
>>>
>>>           {
>>>
>>>             "name": "girth",
>>>
>>>             "type": "Length"
>>>
>>> <div style="box-sizing:inherit;margin-left:32px;padding:0
>>>
>>

Re: Schema parses in C# Avro lib but not in Kafka Schema registry (assume that it is the java lib)

Posted by Ryan Skraba <ry...@skraba.com>.
Hello!  I successfully managed to load the schema into the confluent
schema-registry version 5.3.0 (containing Avro 1.8.1), using the docker
quickstart [1] and the command line:

I just tested that the load worked -- I didn't try reading or writing
binary.

# Save the schema into registry.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"
--data "@PackageCreateInformationSchema.json"
http://localhost:8081/subjects/PackageCreateInformation/versions
# Fetch and check the schema from the registry.
curl -X GET
http://localhost:8081/subjects/PackageCreateInformation/versions/1

The PackageCreateInformationSchema.json file is a bit weird, in the format
{"schema": "your_schema_string_representation"} which takes a lot of
escaping quotes (the exact contents I used follows if you want to
reproduce).

How are you loading the schema, and what version of the schema-registry are
you using?  Perhaps we can narrow it down to a specific avro version and
see if we can reproduce it outside of the registry.

Best regards, Ryan

[1] https://docs.confluent.io/current/quickstart/cos-docker-quickstart.html

PackageCreateInformationSchema.json

{"schema":"{\"type\":\"record\",\"name\":\"PackageCreateInformation\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"sendSms\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"trackingId\",\"type\":\"string\"},{\"name\":\"packageId\",\"type\":\"string\"},{\"name\":\"clientFacilityId\",\"type\":\"string\"},{\"name\":\"sortCode\",\"type\":\"string\"},{\"name\":\"ngsFacilityId\",\"type\":\"string\"},{\"name\":\"merchantId\",\"type\":\"string\"},{\"name\":\"ClassOfService\",\"type\":{\"type\":\"enum\",\"name\":\"ClassesOfService\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Ground\",\"Express\",\"FirstClass\",\"Priority\",\"BPM\",\"PBD3D\"]}},{\"name\":\"carrier\",\"type\":{\"type\":\"enum\",\"name\":\"Carriers\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"USPS\",\"DHL\",\"UPS\",\"FedEx\"]}},{\"name\":\"carrierClassOfService\",\"type\":{\"type\":\"enum\",\"name\":\"CarrierClassesOfService\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"ParcelSelect\",\"ParcelSelectLightweight\",\"FirstClass\",\"Priority\",\"BPM\"]}},{\"name\":\"weight\",\"type\":{\"type\":\"record\",\"name\":\"Weight\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"unitOfMeasure\",\"type\":{\"type\":\"enum\",\"name\":\"WeightUnitOfMeasure\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Pounds\",\"Ounces\",\"Kilograms\",\"Grams\"]}},{\"name\":\"measurmentValue\",\"type\":\"double\"}]}},{\"name\":\"dimensions\",\"type\":{\"type\":\"record\",\"name\":\"Dimensions\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"length\",\"type\":{\"type\":\"record\",\"name\":\"Length\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"unitOfMeasure\",\"type\":{\"type\":\"enum\",\"name\":\"LengthUnitOfMeasure\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Inches\",\"Meters\",\"Centimeters\"]}},{\"name\":\"measurmentValue\",\"type\":\"double\"}]}},{\"name\":\"width\",\"type\":\"Length\"},{\"name\":\"height\",\"type\":\"Length\"},{\"name\":\"girth\",\"type\":\"Length\"},{\"name\":\"isRectangular\",\"type\":\"boolean\"}]}},{\"name\":\"shipToAddress\",\"type\":{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"attention\",\"type\":[\"null\",\"string\"]},{\"name\":\"address1\",\"type\":\"string\"},{\"name\":\"address2\",\"type\":[\"null\",\"string\"]},{\"name\":\"city\",\"type\":[\"null\",\"string\"]},{\"name\":\"stateOrProvince\",\"type\":[\"null\",\"string\"]},{\"name\":\"country\",\"type\":[\"null\",\"string\"]},{\"name\":\"isResidential\",\"type\":[\"null\",\"boolean\"]}]}},{\"name\":\"returnAddress\",\"type\":\"Address\"},{\"name\":\"ReferenceNumbers\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"ReferenceNumbers\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"qualifier\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}}]},{\"name\":\"hazmatClasses\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"enum\",\"name\":\"Hazmat\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Explosives\",\"Gases\",\"FlammableCombustibleLiquids\",\"FlammableSolids\",\"OxidizingSubstancesOrganicPeroxides\",\"ToxicSubstancesAndInfectiousSubstances\",\"RadioactiveMaterial\",\"Corrosives\",\"ORMD\",\"ConsumerCommodities\",\"MiscellaneousHazardousMaterials\"]}}]},{\"name\":\"Subscription\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Subscription\",\"namespace\":\"com.pb.fdr.delivery\",\"fields\":[{\"name\":\"sendSms\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"sendEmail\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"events\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"enum\",\"name\":\"Events\",\"namespace\":\"com.pb.fdr.delivery\",\"symbols\":[\"Delivered\",\"InTransit\"]}}},{\"name\":\"emailAddress\",\"type\":\"string\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}]}]}"}



On Tue, Aug 6, 2019 at 2:50 AM Brian Lachniet <bl...@gmail.com> wrote:

> I don't see anything obviously wrong. I tried generating Java code from
> this schema with *avro-tools *and had no problems.
>
> Unfortunately, I haven't had the opportunity to use the Confluent Schema
> Registry yet. This comment
> <https://github.com/confluentinc/schema-registry/issues/214#issuecomment-437514914>
> seems to suggest that you might find more detailed error messages in the
> schema registry server logs. Have you tried looking there?
>
> On Mon, Aug 5, 2019 at 7:10 PM Patrick Farry <pa...@gmail.com>
> wrote:
>
>> The schema was generated from a protocol using the C# 1.9 code. When we
>> try to load it into the Kafka schema registry we get an error that the
>> schema is invalid.
>>
>> The only feedback we get is “Length is not valid”. If we remove Length
>> then the error says that “Address is not valid” - so presumably there is
>> something wrong referencing previously defined named types.
>>
>> Is there anything obviously wrong?
>>
>>
>> {
>>
>> "type": "record",
>>
>>   "name": "PackageCreateInformation",
>>
>>   "namespace": "com.pb.fdr.delivery",
>>
>>   "fields": [
>>
>>     {
>>
>>       "name": "sendSms",
>>
>>       "type": [
>>
>>         "null",
>>
>>         "boolean"
>>
>>       ]
>>
>>     },
>>
>>     {
>>
>>       "name": "trackingId",
>>
>>       "type": "string"
>>
>>     },
>>
>>     {
>>
>>       "name": "packageId",
>>
>>       "type": "string"
>>
>>     },
>>
>>     {
>>
>>       "name": "clientFacilityId",
>>
>>       "type": "string"
>>
>>     },
>>
>>     {
>>
>>       "name": "sortCode",
>>
>>       "type": "string"
>>
>>     },
>>
>>     {
>>
>>       "name": "ngsFacilityId",
>>
>>       "type": "string"
>>
>>     },
>>
>>     {
>>
>>       "name": "merchantId",
>>
>>       "type": "string"
>>
>>     },
>>
>>     {
>>
>>       "name": "ClassOfService",
>>
>>       "type": {
>>
>>         "type": "enum",
>>
>>         "name": "ClassesOfService",
>>
>>         "namespace": "com.pb.fdr.delivery",
>>
>>         "symbols": [
>>
>>           "Ground",
>>
>>           "Express",
>>
>>           "FirstClass",
>>
>>           "Priority",
>>
>>           "BPM",
>>
>>           "PBD3D"
>>
>>         ]
>>
>>       }
>>
>>     },
>>
>>     {
>>
>>       "name": "carrier",
>>
>>       "type": {
>>
>>         "type": "enum",
>>
>>         "name": "Carriers",
>>
>>         "namespace": "com.pb.fdr.delivery",
>>
>>         "symbols": [
>>
>>           "USPS",
>>
>>           "DHL",
>>
>>           "UPS",
>>
>>           "FedEx"
>>
>>         ]
>>
>>       }
>>
>>     },
>>
>>     {
>>
>>       "name": "carrierClassOfService",
>>
>>       "type": {
>>
>>         "type": "enum",
>>
>>         "name": "CarrierClassesOfService",
>>
>>         "namespace": "com.pb.fdr.delivery",
>>
>>         "symbols": [
>>
>>           "ParcelSelect",
>>
>>           "ParcelSelectLightweight",
>>
>>           "FirstClass",
>>
>>           "Priority",
>>
>>           "BPM"
>>
>>         ]
>>
>>       }
>>
>>     },
>>
>>     {
>>
>>       "name": "weight",
>>
>>       "type": {
>>
>>         "type": "record",
>>
>>         "name": "Weight",
>>
>>         "namespace": "com.pb.fdr.delivery",
>>
>>         "fields": [
>>
>>           {
>>
>>             "name": "unitOfMeasure",
>>
>>             "type": {
>>
>>               "type": "enum",
>>
>>               "name": "WeightUnitOfMeasure",
>>
>>               "namespace": "com.pb.fdr.delivery",
>>
>>               "symbols": [
>>
>>                 "Pounds",
>>
>>                 "Ounces",
>>
>>                 "Kilograms",
>>
>>                 "Grams"
>>
>>               ]
>>
>>             }
>>
>>           },
>>
>>           {
>>
>>             "name": "measurmentValue",
>>
>>             "type": "double"
>>
>>           }
>>
>>         ]
>>
>>       }
>>
>>     },
>>
>>     {
>>
>>       "name": "dimensions",
>>
>>       "type": {
>>
>>         "type": "record",
>>
>>         "name": "Dimensions",
>>
>>         "namespace": "com.pb.fdr.delivery",
>>
>>         "fields": [
>>
>>           {
>>
>>             "name": "length",
>>
>>             "type": {
>>
>>               "type": "record",
>>
>>               "name": "Length",
>>
>>               "namespace": "com.pb.fdr.delivery",
>>
>>               "fields": [
>>
>>                 {
>>
>>                   "name": "unitOfMeasure",
>>
>>                   "type": {
>>
>>                     "type": "enum",
>>
>>                     "name": "LengthUnitOfMeasure",
>>
>>                     "namespace": "com.pb.fdr.delivery",
>>
>>                     "symbols": [
>>
>>                       "Inches",
>>
>>                       "Meters",
>>
>>                       "Centimeters"
>>
>>                     ]
>>
>>                   }
>>
>>                 },
>>
>>                 {
>>
>>                   "name": "measurmentValue",
>>
>>                   "type": "double"
>>
>>                 }
>>
>>               ]
>>
>>             }
>>
>>           },
>>
>>           {
>>
>>             "name": "width",
>>
>>             "type": "Length"
>>
>>           },
>>
>>           {
>>
>>             "name": "height",
>>
>>             "type": "Length"
>>
>>           },
>>
>>           {
>>
>>             "name": "girth",
>>
>>             "type": "Length"
>>
>>           },
>>
>>           {
>>
>>             "name": "isRectangular",
>>
>>             "type": "boolean"
>>
>>           }
>>
>>         ]
>>
>>       }
>>
>>     },
>>
>>     {
>>
>>       "name": "shipToAddress",
>>
>>       "type": {
>>
>>         "type": "record",
>>
>>         "name": "Address",
>>
>>         "namespace": "com.pb.fdr.delivery",
>>
>>         "fields": [
>>
>>           {
>>
>>             "name": "name",
>>
>>             "type": "string"
>>
>>           },
>>
>>           {
>>
>>             "name": "attention",
>>
>>             "type": [
>>
>>               "null",
>>
>>               "string"
>>
>>             ]
>>
>>           },
>>
>>           {
>>
>>             "name": "address1",
>>
>>             "type": "string"
>>
>>           },
>>
>>           {
>>
>>             "name": "address2",
>>
>>             "type": [
>>
>>               "null",
>>
>>               "string"
>>
>>             ]
>>
>>           },
>>
>>           {
>>
>>             "name": "city",
>>
>>             "type": [
>>
>>               "null",
>>
>>               "string"
>>
>>             ]
>>
>>           },
>>
>>           {
>>
>>             "name": "stateOrProvince",
>>
>>             "type": [
>>
>>               "null",
>>
>>               "string"
>>
>>             ]
>>
>>           },
>>
>>           {
>>
>>             "name": "country",
>>
>>             "type": [
>>
>>               "null",
>>
>>               "string"
>>
>>             ]
>>
>>           },
>>
>>           {
>>
>>             "name": "isResidential",
>>
>>             "type": [
>>
>>               "null",
>>
>>               "boolean"
>>
>>             ]
>>
>>           }
>>
>>         ]
>>
>>       }
>>
>>     },
>>
>>     {
>>
>>       "name": "returnAddress",
>>
>>       "type": "Address"
>>
>>     },
>>
>>     {
>>
>>       "name": "ReferenceNumbers",
>>
>>       "type": [
>>
>>         "null",
>>
>>         {
>>
>>           "type": "array",
>>
>>           "items": {
>>
>>             "type": "record",
>>
>>             "name": "ReferenceNumbers",
>>
>>             "namespace": "com.pb.fdr.delivery",
>>
>>             "fields": [
>>
>>               {
>>
>>                 "name": "qualifier",
>>
>>                 "type": "string"
>>
>>               },
>>
>>               {
>>
>>                 "name": "value",
>>
>>                 "type": "string"
>>
>>               }
>>
>>             ]
>>
>>           }
>>
>>         }
>>
>>       ]
>>
>>     },
>>
>>     {
>>
>>       "name": "hazmatClasses",
>>
>>       "type": [
>>
>>         "null",
>>
>>         {
>>
>>           "type": "array",
>>
>>           "items": {
>>
>>             "type": "enum",
>>
>>             "name": "Hazmat",
>>
>>             "namespace": "com.pb.fdr.delivery",
>>
>>             "symbols": [
>>
>>               "Explosives",
>>
>>               "Gases",
>>
>>               "FlammableCombustibleLiquids",
>>
>>               "FlammableSolids",
>>
>>               "OxidizingSubstancesOrganicPeroxides",
>>
>>               "ToxicSubstancesAndInfectiousSubstances",
>>
>>               "RadioactiveMaterial",
>>
>>               "Corrosives",
>>
>>               "ORMD",
>>
>>               "ConsumerCommodities",
>>
>>               "MiscellaneousHazardousMaterials"
>>
>>             ]
>>
>>           }
>>
>>         }
>>
>>       ]
>>
>>     },
>>
>>     {
>>
>>       "name": "Subscription",
>>
>>       "type": [
>>
>>         "null",
>>
>>         {
>>
>>           "type": "record",
>>
>>           "name": "Subscription",
>>
>>           "namespace": "com.pb.fdr.delivery",
>>
>>           "fields": [
>>
>>             {
>>
>>               "name": "sendSms",
>>
>>               "type": [
>>
>>                 "null",
>>
>>                 "boolean"
>>
>>               ]
>>
>>             },
>>
>>             {
>>
>>               "name": "sendEmail",
>>
>>               "type": [
>>
>>                 "null",
>>
>>                 "boolean"
>>
>>               ]
>>
>>             },
>>
>>             {
>>
>>               "name": "events",
>>
>>               "type": {
>>
>>                 "type": "array",
>>
>>                 "items": {
>>
>>                   "type": "enum",
>>
>>                   "name": "Events",
>>
>>                   "namespace": "com.pb.fdr.delivery",
>>
>>                   "symbols": [
>>
>>                     "Delivered",
>>
>>                     "InTransit"
>>
>>                   ]
>>
>>                 }
>>
>>               }
>>
>>             },
>>
>>             {
>>
>>               "name": "emailAddress",
>>
>>               "type": "string"
>>
>>             },
>>
>>             {
>>
>>               "name": "phoneNumber",
>>
>>               "type": "string"
>>
>>             }
>>
>>           ]
>>
>>         }
>>
>>       ]
>>
>>     }
>>
>>   ]
>>
>> }
>>
>>
>>
>
> --
>
> [image: 51b630b05e01a6d5134ccfd520f547c4.png]
>
> Brian Lachniet
>
> Software Engineer
>
> E: blachniet@gmail.com | blachniet.com <http://www.blachniet.com>
>
> <https://twitter.com/blachniet> <http://www.linkedin.com/in/blachniet>
>

Re: Schema parses in C# Avro lib but not in Kafka Schema registry (assume that it is the java lib)

Posted by Brian Lachniet <bl...@gmail.com>.
I don't see anything obviously wrong. I tried generating Java code from
this schema with *avro-tools *and had no problems.

Unfortunately, I haven't had the opportunity to use the Confluent Schema
Registry yet. This comment
<https://github.com/confluentinc/schema-registry/issues/214#issuecomment-437514914>
seems to suggest that you might find more detailed error messages in the
schema registry server logs. Have you tried looking there?

On Mon, Aug 5, 2019 at 7:10 PM Patrick Farry <pa...@gmail.com>
wrote:

> The schema was generated from a protocol using the C# 1.9 code. When we
> try to load it into the Kafka schema registry we get an error that the
> schema is invalid.
>
> The only feedback we get is “Length is not valid”. If we remove Length
> then the error says that “Address is not valid” - so presumably there is
> something wrong referencing previously defined named types.
>
> Is there anything obviously wrong?
>
>
> {
>
> "type": "record",
>
>   "name": "PackageCreateInformation",
>
>   "namespace": "com.pb.fdr.delivery",
>
>   "fields": [
>
>     {
>
>       "name": "sendSms",
>
>       "type": [
>
>         "null",
>
>         "boolean"
>
>       ]
>
>     },
>
>     {
>
>       "name": "trackingId",
>
>       "type": "string"
>
>     },
>
>     {
>
>       "name": "packageId",
>
>       "type": "string"
>
>     },
>
>     {
>
>       "name": "clientFacilityId",
>
>       "type": "string"
>
>     },
>
>     {
>
>       "name": "sortCode",
>
>       "type": "string"
>
>     },
>
>     {
>
>       "name": "ngsFacilityId",
>
>       "type": "string"
>
>     },
>
>     {
>
>       "name": "merchantId",
>
>       "type": "string"
>
>     },
>
>     {
>
>       "name": "ClassOfService",
>
>       "type": {
>
>         "type": "enum",
>
>         "name": "ClassesOfService",
>
>         "namespace": "com.pb.fdr.delivery",
>
>         "symbols": [
>
>           "Ground",
>
>           "Express",
>
>           "FirstClass",
>
>           "Priority",
>
>           "BPM",
>
>           "PBD3D"
>
>         ]
>
>       }
>
>     },
>
>     {
>
>       "name": "carrier",
>
>       "type": {
>
>         "type": "enum",
>
>         "name": "Carriers",
>
>         "namespace": "com.pb.fdr.delivery",
>
>         "symbols": [
>
>           "USPS",
>
>           "DHL",
>
>           "UPS",
>
>           "FedEx"
>
>         ]
>
>       }
>
>     },
>
>     {
>
>       "name": "carrierClassOfService",
>
>       "type": {
>
>         "type": "enum",
>
>         "name": "CarrierClassesOfService",
>
>         "namespace": "com.pb.fdr.delivery",
>
>         "symbols": [
>
>           "ParcelSelect",
>
>           "ParcelSelectLightweight",
>
>           "FirstClass",
>
>           "Priority",
>
>           "BPM"
>
>         ]
>
>       }
>
>     },
>
>     {
>
>       "name": "weight",
>
>       "type": {
>
>         "type": "record",
>
>         "name": "Weight",
>
>         "namespace": "com.pb.fdr.delivery",
>
>         "fields": [
>
>           {
>
>             "name": "unitOfMeasure",
>
>             "type": {
>
>               "type": "enum",
>
>               "name": "WeightUnitOfMeasure",
>
>               "namespace": "com.pb.fdr.delivery",
>
>               "symbols": [
>
>                 "Pounds",
>
>                 "Ounces",
>
>                 "Kilograms",
>
>                 "Grams"
>
>               ]
>
>             }
>
>           },
>
>           {
>
>             "name": "measurmentValue",
>
>             "type": "double"
>
>           }
>
>         ]
>
>       }
>
>     },
>
>     {
>
>       "name": "dimensions",
>
>       "type": {
>
>         "type": "record",
>
>         "name": "Dimensions",
>
>         "namespace": "com.pb.fdr.delivery",
>
>         "fields": [
>
>           {
>
>             "name": "length",
>
>             "type": {
>
>               "type": "record",
>
>               "name": "Length",
>
>               "namespace": "com.pb.fdr.delivery",
>
>               "fields": [
>
>                 {
>
>                   "name": "unitOfMeasure",
>
>                   "type": {
>
>                     "type": "enum",
>
>                     "name": "LengthUnitOfMeasure",
>
>                     "namespace": "com.pb.fdr.delivery",
>
>                     "symbols": [
>
>                       "Inches",
>
>                       "Meters",
>
>                       "Centimeters"
>
>                     ]
>
>                   }
>
>                 },
>
>                 {
>
>                   "name": "measurmentValue",
>
>                   "type": "double"
>
>                 }
>
>               ]
>
>             }
>
>           },
>
>           {
>
>             "name": "width",
>
>             "type": "Length"
>
>           },
>
>           {
>
>             "name": "height",
>
>             "type": "Length"
>
>           },
>
>           {
>
>             "name": "girth",
>
>             "type": "Length"
>
>           },
>
>           {
>
>             "name": "isRectangular",
>
>             "type": "boolean"
>
>           }
>
>         ]
>
>       }
>
>     },
>
>     {
>
>       "name": "shipToAddress",
>
>       "type": {
>
>         "type": "record",
>
>         "name": "Address",
>
>         "namespace": "com.pb.fdr.delivery",
>
>         "fields": [
>
>           {
>
>             "name": "name",
>
>             "type": "string"
>
>           },
>
>           {
>
>             "name": "attention",
>
>             "type": [
>
>               "null",
>
>               "string"
>
>             ]
>
>           },
>
>           {
>
>             "name": "address1",
>
>             "type": "string"
>
>           },
>
>           {
>
>             "name": "address2",
>
>             "type": [
>
>               "null",
>
>               "string"
>
>             ]
>
>           },
>
>           {
>
>             "name": "city",
>
>             "type": [
>
>               "null",
>
>               "string"
>
>             ]
>
>           },
>
>           {
>
>             "name": "stateOrProvince",
>
>             "type": [
>
>               "null",
>
>               "string"
>
>             ]
>
>           },
>
>           {
>
>             "name": "country",
>
>             "type": [
>
>               "null",
>
>               "string"
>
>             ]
>
>           },
>
>           {
>
>             "name": "isResidential",
>
>             "type": [
>
>               "null",
>
>               "boolean"
>
>             ]
>
>           }
>
>         ]
>
>       }
>
>     },
>
>     {
>
>       "name": "returnAddress",
>
>       "type": "Address"
>
>     },
>
>     {
>
>       "name": "ReferenceNumbers",
>
>       "type": [
>
>         "null",
>
>         {
>
>           "type": "array",
>
>           "items": {
>
>             "type": "record",
>
>             "name": "ReferenceNumbers",
>
>             "namespace": "com.pb.fdr.delivery",
>
>             "fields": [
>
>               {
>
>                 "name": "qualifier",
>
>                 "type": "string"
>
>               },
>
>               {
>
>                 "name": "value",
>
>                 "type": "string"
>
>               }
>
>             ]
>
>           }
>
>         }
>
>       ]
>
>     },
>
>     {
>
>       "name": "hazmatClasses",
>
>       "type": [
>
>         "null",
>
>         {
>
>           "type": "array",
>
>           "items": {
>
>             "type": "enum",
>
>             "name": "Hazmat",
>
>             "namespace": "com.pb.fdr.delivery",
>
>             "symbols": [
>
>               "Explosives",
>
>               "Gases",
>
>               "FlammableCombustibleLiquids",
>
>               "FlammableSolids",
>
>               "OxidizingSubstancesOrganicPeroxides",
>
>               "ToxicSubstancesAndInfectiousSubstances",
>
>               "RadioactiveMaterial",
>
>               "Corrosives",
>
>               "ORMD",
>
>               "ConsumerCommodities",
>
>               "MiscellaneousHazardousMaterials"
>
>             ]
>
>           }
>
>         }
>
>       ]
>
>     },
>
>     {
>
>       "name": "Subscription",
>
>       "type": [
>
>         "null",
>
>         {
>
>           "type": "record",
>
>           "name": "Subscription",
>
>           "namespace": "com.pb.fdr.delivery",
>
>           "fields": [
>
>             {
>
>               "name": "sendSms",
>
>               "type": [
>
>                 "null",
>
>                 "boolean"
>
>               ]
>
>             },
>
>             {
>
>               "name": "sendEmail",
>
>               "type": [
>
>                 "null",
>
>                 "boolean"
>
>               ]
>
>             },
>
>             {
>
>               "name": "events",
>
>               "type": {
>
>                 "type": "array",
>
>                 "items": {
>
>                   "type": "enum",
>
>                   "name": "Events",
>
>                   "namespace": "com.pb.fdr.delivery",
>
>                   "symbols": [
>
>                     "Delivered",
>
>                     "InTransit"
>
>                   ]
>
>                 }
>
>               }
>
>             },
>
>             {
>
>               "name": "emailAddress",
>
>               "type": "string"
>
>             },
>
>             {
>
>               "name": "phoneNumber",
>
>               "type": "string"
>
>             }
>
>           ]
>
>         }
>
>       ]
>
>     }
>
>   ]
>
> }
>
>
>

-- 

[image: 51b630b05e01a6d5134ccfd520f547c4.png]

Brian Lachniet

Software Engineer

E: blachniet@gmail.com | blachniet.com <http://www.blachniet.com>

<https://twitter.com/blachniet> <http://www.linkedin.com/in/blachniet>

Schema parses in C# Avro lib but not in Kafka Schema registry (assume that it is the java lib)

Posted by Patrick Farry <pa...@gmail.com>.
The schema was generated from a protocol using the C# 1.9 code. When we try to load it into the Kafka schema registry we get an error that the schema is invalid.

The only feedback we get is “Length is not valid”. If we remove Length then the error says that “Address is not valid” - so presumably there is something wrong referencing previously defined named types.

Is there anything obviously wrong? 


{  
"type": "record",
  "name": "PackageCreateInformation",
  "namespace": "com.pb.fdr.delivery",
  "fields": [
    {
      "name": "sendSms",
      "type": [
        "null",
        "boolean"
      ]
    },
    {
      "name": "trackingId",
      "type": "string"
    },
    {
      "name": "packageId",
      "type": "string"
    },
    {
      "name": "clientFacilityId",
      "type": "string"
    },
    {
      "name": "sortCode",
      "type": "string"
    },
    {
      "name": "ngsFacilityId",
      "type": "string"
    },
    {
      "name": "merchantId",
      "type": "string"
    },
    {
      "name": "ClassOfService",
      "type": {
        "type": "enum",
        "name": "ClassesOfService",
        "namespace": "com.pb.fdr.delivery",
        "symbols": [
          "Ground",
          "Express",
          "FirstClass",
          "Priority",
          "BPM",
          "PBD3D"
        ]
      }
    },
    {
      "name": "carrier",
      "type": {
        "type": "enum",
        "name": "Carriers",
        "namespace": "com.pb.fdr.delivery",
        "symbols": [
          "USPS",
          "DHL",
          "UPS",
          "FedEx"
        ]
      }
    },
    {
      "name": "carrierClassOfService",
      "type": {
        "type": "enum",
        "name": "CarrierClassesOfService",
        "namespace": "com.pb.fdr.delivery",
        "symbols": [
          "ParcelSelect",
          "ParcelSelectLightweight",
          "FirstClass",
          "Priority",
          "BPM"
        ]
      }
    },
    {
      "name": "weight",
      "type": {
        "type": "record",
        "name": "Weight",
        "namespace": "com.pb.fdr.delivery",
        "fields": [
          {
            "name": "unitOfMeasure",
            "type": {
              "type": "enum",
              "name": "WeightUnitOfMeasure",
              "namespace": "com.pb.fdr.delivery",
              "symbols": [
                "Pounds",
                "Ounces",
                "Kilograms",
                "Grams"
              ]
            }
          },
          {
            "name": "measurmentValue",
            "type": "double"
          }
        ]
      }
    },
    {
      "name": "dimensions",
      "type": {
        "type": "record",
        "name": "Dimensions",
        "namespace": "com.pb.fdr.delivery",
        "fields": [
          {
            "name": "length",
            "type": {
              "type": "record",
              "name": "Length",
              "namespace": "com.pb.fdr.delivery",
              "fields": [
                {
                  "name": "unitOfMeasure",
                  "type": {
                    "type": "enum",
                    "name": "LengthUnitOfMeasure",
                    "namespace": "com.pb.fdr.delivery",
                    "symbols": [
                      "Inches",
                      "Meters",
                      "Centimeters"
                    ]
                  }
                },
                {
                  "name": "measurmentValue",
                  "type": "double"
                }
              ]
            }
          },
          {
            "name": "width",
            "type": "Length"
          },
          {
            "name": "height",
            "type": "Length"
          },
          {
            "name": "girth",
            "type": "Length"
          },
          {
            "name": "isRectangular",
            "type": "boolean"
          }
        ]
      }
    },
    {
      "name": "shipToAddress",
      "type": {
        "type": "record",
        "name": "Address",
        "namespace": "com.pb.fdr.delivery",
        "fields": [
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "attention",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "address1",
            "type": "string"
          },
          {
            "name": "address2",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "city",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "stateOrProvince",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "country",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "isResidential",
            "type": [
              "null",
              "boolean"
            ]
          }
        ]
      }
    },
    {
      "name": "returnAddress",
      "type": "Address"
    },
    {
      "name": "ReferenceNumbers",
      "type": [
        "null",
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "ReferenceNumbers",
            "namespace": "com.pb.fdr.delivery",
            "fields": [
              {
                "name": "qualifier",
                "type": "string"
              },
              {
                "name": "value",
                "type": "string"
              }
            ]
          }
        }
      ]
    },
    {
      "name": "hazmatClasses",
      "type": [
        "null",
        {
          "type": "array",
          "items": {
            "type": "enum",
            "name": "Hazmat",
            "namespace": "com.pb.fdr.delivery",
            "symbols": [
              "Explosives",
              "Gases",
              "FlammableCombustibleLiquids",
              "FlammableSolids",
              "OxidizingSubstancesOrganicPeroxides",
              "ToxicSubstancesAndInfectiousSubstances",
              "RadioactiveMaterial",
              "Corrosives",
              "ORMD",
              "ConsumerCommodities",
              "MiscellaneousHazardousMaterials"
            ]
          }
        }
      ]
    },
    {
      "name": "Subscription",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Subscription",
          "namespace": "com.pb.fdr.delivery",
          "fields": [
            {
              "name": "sendSms",
              "type": [
                "null",
                "boolean"
              ]
            },
            {
              "name": "sendEmail",
              "type": [
                "null",
                "boolean"
              ]
            },
            {
              "name": "events",
              "type": {
                "type": "array",
                "items": {
                  "type": "enum",
                  "name": "Events",
                  "namespace": "com.pb.fdr.delivery",
                  "symbols": [
                    "Delivered",
                    "InTransit"
                  ]
                }
              }
            },
            {
              "name": "emailAddress",
              "type": "string"
            },
            {
              "name": "phoneNumber",
              "type": "string"
            }
          ]
        }
      ]
    }
  ]
}


Re: AVRO schema evolution: adding optional column with default fails deserialization

Posted by Ryan Skraba <ry...@skraba.com>.
Hello!  This is a good discussion.  For your question:

> when sending avro bytes (obtained by provided serializer[1]), they are or can be somehow paired with schema used to serialize data

The answer is no, not in the serializer method you've provided -- it
serializes *only* the data without any reference to the schema.  This
is by design in Avro, with the assumption that the writer schema is
communicated independently from the data.  In the case of Avro
container files, it's in the file header for example.  For messaging
use cases, the schema must be shared between producers and consumers
by some other route.

If you're already using Kafka, the confluent schema registry is open
source and the documentation is quite good:
https://github.com/confluentinc/schema-registry  It adds a 5 byte
header to the binary data to identify the writer schema and provides
the tools to store and retrieve these writer schemas in the existing
kafka infrastructure.

I noticed that in 1.8.2, the Avro specification was extended to add a
"single object encoding" and I've been meaning to take a closer look
at it:  http://avro.apache.org/docs/1.8.2/spec.html#single_object_encoding

This also provides a mechanism to uniquely identify the writer schema
by adding its unique fingerprint to the bytes, but leave the mechanics
of storing and finding the schema up to you.  I imagine that you could
do an in-memory implementation if you know *all* of the writer schemas
in advance.

In Java, the implementation of the message coder and decoder is here
(where the schema store is the "registry"):
https://avro.apache.org/docs/1.8.2/api/java/org/apache/avro/message/BinaryMessageDecoder.html#BinaryMessageDecoder(org.apache.avro.generic.GenericData,%20org.apache.avro.Schema,%20org.apache.avro.message.SchemaStore)

For both confluent schema registry and the single object encoding, the
byte[] is no longer "just pure data" since it now includes the writer
schema id.

I'd be interesting in hearing if these could help solve your problem!

On Thu, Aug 1, 2019 at 7:24 PM Svante Karlsson <sv...@csi.se> wrote:
>
> First of all you can use confluents schema registry as you which - it's not in the paid bundle as long as you are not hosting kafka as a service (ie amazon et al). And I would recommend you to. It's good and trivial to operate.
>
> Second,  take a look at the serializer in my pet project at:
> https://github.com/bitbouncer/kspp/blob/master/include/kspp/avro/avro_serdes.h:96
>
> Note that this encoder/decoder does not support schema evolution but it discovers the actual written schema and gets a "avro::ValidSchema" from the schema registry on read. And this is what you need.
>
> This is of course c++ but you can probably figure out what you need to do.
>
> In the end you will need a rest/grpc service somewhere that your serializer can use to get an in that you can refer to across your infrastructure. I did write one some years ago but reverted to confluents since most people use that.
>
> /svante
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Den tors 1 aug. 2019 kl 18:05 skrev Martin Mucha <al...@gmail.com>:
>>
>> Thanks for answer!
>>
>> Ad: "which byte[] are we talking about?" — actually I don't know. Please lets break it down together.
>>
>> I'm pretty sure, that we're not using confluent platform(iiuc the paid bundle, right?). I shared some serializer before [1], so you're saying, that this wont include neither schema ID, nor schema OK? Ok, lets assume that. Next. We're using SpringKafka project, to get this serialized data and send them over kafka. So we don't have any schema registry, but in principle it could be possible to include schema within each message. But I cannot see how that could be done. SpringKafka requires us to provide him org.apache.kafka.clients.producer.ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG, which we did, but it's just a class calling serializer [1], and from that point on I have no idea how it could figure out used schema. The question here I'm asking is, whether when sending avro bytes (obtained by provided serializer[1]), they are or can be somehow paired with schema used to serialize data? Is this what kafka senders do, or can do? Include ID/whole schema somewhere in headers or ...??? And when I read kafka messages, will the schema be (or could be) somewhere stored in ConsumerRecord or somewhere like that?
>>
>> sorry for confused questions, but I'm really missing knowledge to even ask properly.
>>
>> thanks,
>> Martin.
>>
>> [1]
>> public static <T extends SpecificRecordBase> byte[] serialize(T data, boolean useBinaryDecoder, boolean pretty) {
>>         try {
>>             if (data == null) {
>>                 return new byte[0];
>>             }
>>
>>             log.debug("data='{}'", data);
>>             Schema schema = data.getSchema();
>>             ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
>>             Encoder binaryEncoder = useBinaryDecoder
>>                     ? EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>>                     : EncoderFactory.get().jsonEncoder(schema, byteArrayOutputStream, pretty);
>>
>>             DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
>>             datumWriter.write(data, binaryEncoder);
>>
>>             binaryEncoder.flush();
>>             byteArrayOutputStream.close();
>>
>>             byte[] result = byteArrayOutputStream.toByteArray();
>>             log.debug("serialized data='{}'", DatatypeConverter.printHexBinary(result));
>>             return result;
>>         } catch (IOException ex) {
>>             throw new SerializationException(
>>                     "Can't serialize data='" + data, ex);
>>         }
>>     }
>>
>> čt 1. 8. 2019 v 17:06 odesílatel Svante Karlsson <sv...@csi.se> napsal:
>>>
>>> For clarity: What byte[] are we talking about?
>>>
>>> You are slightly missing my point if we are speaking about kafka.
>>>
>>> Confluent encoding:
>>> <byte> <int32_t>    <avro_binary_payload>
>>> 0          schema_id  avro
>>>
>>> avro_binary_payload does not in any case contain the schema or schema id. The schema id is a confluent thing. (in an avrofile the schema is prepended by value in the file)
>>>
>>> While it's trivial to build a schema registry that for example instead gives you a md5 hash of the schema you have to use it throughout your infrastructure OR use known reader and writer schema (ie hardcoded).
>>>
>>> In confluent world the id=N is the N+1'th registered schema in the database (a kafka topic) if I remember right. Loose that database and you cannot read your kafka topics.
>>>
>>> So you have to use some other encoder, homegrown or not that embeds either the full schema in every message (expensive) of some id. Does this make sense?
>>>
>>> /svante
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Den tors 1 aug. 2019 kl 16:38 skrev Martin Mucha <al...@gmail.com>:
>>>>
>>>> Thanks for answer.
>>>>
>>>> What I knew already is, that in each message there is _somehow_ present either _some_ schema ID or full schema. I saw some byte array manipulations to get _somehow_ defined schema ID from byte[], which worked, but that's definitely not how it should be done. What I'm looking for is some documentation of _how_ to do these things right. I really cannot find a single thing, yet there must be some util functions, or anything. Is there some devel-first-steps page, where can I find answers for:
>>>>
>>>> * How to test, whether byte[] contains full schema or just id?
>>>> * How to control, whether message is serialized with ID or with full schema?
>>>> * how to get ID from byte[]?
>>>> * how to get full schema from byte[]?
>>>>
>>>> I don't have confluent platform, and cannot have it, but implementing "get schema by ID" should be easy task, provided, that I have that ID. In my scenario I know, that message will be written using one schema, just different versions of it. So I just need to know, which version it is, so that I can configure deserializer to enable schema evolution.
>>>>
>>>> thanks in advance,
>>>> Martin
>>>>
>>>> čt 1. 8. 2019 v 15:55 odesílatel Svante Karlsson <sv...@csi.se> napsal:
>>>>>
>>>>> In an avrofile the schema is in the beginning but if you refer a single record serialization like Kafka then you have to add something that you can use to get hold of the schema. Confluents avroencoder for Kafka uses confluents schema registry that uses int32 as schema Id. This is prepended (+a magic byte) to the binary avro. Thus using the schema registry again you can get the writer schema.
>>>>>
>>>>> /Svante
>>>>>
>>>>> On Thu, Aug 1, 2019, 15:30 Martin Mucha <al...@gmail.com> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> just one more question, not strictly related to the subject.
>>>>>>
>>>>>> Initially I though I'd be OK with using some initial version of schema in place of writer schema. That works, but all columns from schema older than this initial one would be just ignored. So I need to know EXACTLY the schema, which writer used. I know, that avro messages contains either full schema or at least it's ID. Can you point me to the documentation, where this is discussed? So in my deserializer I have byte[] as a input, from which I need to get the schema information first, in order to be able to deserialize the record. I really do not know how to do that, I'm pretty sure I never saw this anywhere, and I cannot find it anywhere. But in principle it must be possible, since reader need not necessarily have any control of which schema writer used.
>>>>>>
>>>>>> thanks a lot.
>>>>>> M.
>>>>>>
>>>>>> út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <al...@gmail.com> napsal:
>>>>>>>
>>>>>>> Thank you very much for in depth answer. I understand how it works now better, will test it shortly.
>>>>>>> Thank you for your time.
>>>>>>>
>>>>>>> Martin.
>>>>>>>
>>>>>>> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <ry...@skraba.com> napsal:
>>>>>>>>
>>>>>>>> Hello!  It's the same issue in your example code as allegro, even with
>>>>>>>> the SpecificDatumReader.
>>>>>>>>
>>>>>>>> This line: datumReader = new SpecificDatumReader<>(schema)
>>>>>>>> should be: datumReader = new SpecificDatumReader<>(originalSchema, schema)
>>>>>>>>
>>>>>>>> In Avro, the original schema is commonly known as the writer schema
>>>>>>>> (the instance that originally wrote the binary data).  Schema
>>>>>>>> evolution applies when you are using the constructor of the
>>>>>>>> SpecificDatumReader that takes *both* reader and writer schemas.
>>>>>>>>
>>>>>>>> As a concrete example, if your original schema was:
>>>>>>>>
>>>>>>>> {
>>>>>>>>   "type": "record",
>>>>>>>>   "name": "Simple",
>>>>>>>>   "fields": [
>>>>>>>>     {"name": "id", "type": "int"},
>>>>>>>>     {"name": "name","type": "string"}
>>>>>>>>   ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> And you added a field:
>>>>>>>>
>>>>>>>> {
>>>>>>>>   "type": "record",
>>>>>>>>   "name": "SimpleV2",
>>>>>>>>   "fields": [
>>>>>>>>     {"name": "id", "type": "int"},
>>>>>>>>     {"name": "name", "type": "string"},
>>>>>>>>     {"name": "description","type": ["null", "string"]}
>>>>>>>>   ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> You could do the following safely, assuming that Simple and SimpleV2
>>>>>>>> classes are generated from the avro-maven-plugin:
>>>>>>>>
>>>>>>>> @Test
>>>>>>>> public void testSerializeDeserializeEvolution() throws IOException {
>>>>>>>>   // Write a Simple v1 to bytes using your exact method.
>>>>>>>>   byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false);
>>>>>>>>
>>>>>>>>   // Read as Simple v2, same as your method but with the writer and
>>>>>>>> reader schema.
>>>>>>>>   DatumReader<SimpleV2> datumReader =
>>>>>>>>       new SpecificDatumReader<>(Simple.getClassSchema(),
>>>>>>>> SimpleV2.getClassSchema());
>>>>>>>>   Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes, null);
>>>>>>>>   SimpleV2 v2 = datumReader.read(null, decoder);
>>>>>>>>
>>>>>>>>   assertThat(v2.getId(), is(1));
>>>>>>>>   assertThat(v2.getName(), is(new Utf8("name1")));
>>>>>>>>   assertThat(v2.getDescription(), nullValue());
>>>>>>>> }
>>>>>>>>
>>>>>>>> This demonstrates with two different schemas and SpecificRecords in
>>>>>>>> the same test, but the same principle applies if it's the same record
>>>>>>>> that has evolved -- you need to know the original schema that wrote
>>>>>>>> the data in order to apply the schema that you're now using for
>>>>>>>> reading.
>>>>>>>>
>>>>>>>> I hope this clarifies what you are looking for!
>>>>>>>>
>>>>>>>> All my best, Ryan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <al...@gmail.com> wrote:
>>>>>>>> >
>>>>>>>> > Thanks for answer.
>>>>>>>> >
>>>>>>>> > Actually I have exactly the same behavior with avro 1.9.0 and following deserializer in our other app, which uses strictly avro codebase, and failing with same exceptions. So lets leave "allegro" library and lots of other tools out of it in our discussion.
>>>>>>>> > I can use whichever aproach. All I need is single way, where I can deserialize byte[] into class generated by avro-maven-plugin, and which will respect documentation regarding schema evolution. Currently we're using following deserializer and serializer, and these does not work when it comes to schema evolution. What is the correct way to serialize and deserializer avro data?
>>>>>>>> >
>>>>>>>> > I probably don't understand your mention about GenericRecord or GenericDatumReader. I tried to use GenericDatumReader in deserializer below, but then it seems I got back just GenericData$Record instance, which I can use then to access array of instances, which is not what I'm looking for(IIUC), since in that case I could have just use plain old JSON and deserialize it using jackson having no schema evolution problems at all. If that's correct, I'd rather stick to SpecificDatumReader, and somehow fix it if possible.
>>>>>>>> >
>>>>>>>> > What can be done? Or how schema evolution is intended to be used? I found a lots of question searching for this answer.
>>>>>>>> >
>>>>>>>> > thanks!
>>>>>>>> > Martin.
>>>>>>>> >
>>>>>>>> > deserializer:
>>>>>>>> >
>>>>>>>> > public static <T extends SpecificRecordBase> T deserialize(Class<T> targetType,
>>>>>>>> >                                                                byte[] data,
>>>>>>>> >                                                                boolean useBinaryDecoder) {
>>>>>>>> >         try {
>>>>>>>> >             if (data == null) {
>>>>>>>> >                 return null;
>>>>>>>> >             }
>>>>>>>> >
>>>>>>>> >             log.trace("data='{}'", DatatypeConverter.printHexBinary(data));
>>>>>>>> >
>>>>>>>> >             Schema schema = targetType.newInstance().getSchema();
>>>>>>>> >             DatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(schema);
>>>>>>>> >             Decoder decoder = useBinaryDecoder
>>>>>>>> >                     ? DecoderFactory.get().binaryDecoder(data, null)
>>>>>>>> >                     : DecoderFactory.get().jsonDecoder(schema, new String(data));
>>>>>>>> >
>>>>>>>> >             T result = targetType.cast(datumReader.read(null, decoder));
>>>>>>>> >             log.trace("deserialized data='{}'", result);
>>>>>>>> >             return result;
>>>>>>>> >         } catch (Exception ex) {
>>>>>>>> >             throw new SerializationException("Error deserializing data", ex);
>>>>>>>> >         }
>>>>>>>> >     }
>>>>>>>> >
>>>>>>>> > serializer:
>>>>>>>> > public static <T extends SpecificRecordBase> byte[] serialize(T data, boolean useBinaryDecoder, boolean pretty) {
>>>>>>>> >         try {
>>>>>>>> >             if (data == null) {
>>>>>>>> >                 return new byte[0];
>>>>>>>> >             }
>>>>>>>> >
>>>>>>>> >             log.debug("data='{}'", data);
>>>>>>>> >             Schema schema = data.getSchema();
>>>>>>>> >             ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
>>>>>>>> >             Encoder binaryEncoder = useBinaryDecoder
>>>>>>>> >                     ? EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>>>>>>>> >                     : EncoderFactory.get().jsonEncoder(schema, byteArrayOutputStream, pretty);
>>>>>>>> >
>>>>>>>> >             DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
>>>>>>>> >             datumWriter.write(data, binaryEncoder);
>>>>>>>> >
>>>>>>>> >             binaryEncoder.flush();
>>>>>>>> >             byteArrayOutputStream.close();
>>>>>>>> >
>>>>>>>> >             byte[] result = byteArrayOutputStream.toByteArray();
>>>>>>>> >             log.debug("serialized data='{}'", DatatypeConverter.printHexBinary(result));
>>>>>>>> >             return result;
>>>>>>>> >         } catch (IOException ex) {
>>>>>>>> >             throw new SerializationException(
>>>>>>>> >                     "Can't serialize data='" + data, ex);
>>>>>>>> >         }
>>>>>>>> >     }
>>>>>>>> >
>>>>>>>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <ry...@skraba.com> napsal:
>>>>>>>> >>
>>>>>>>> >> Hello!  Schema evolution relies on both the writer and reader schemas
>>>>>>>> >> being available.
>>>>>>>> >>
>>>>>>>> >> It looks like the allegro tool you are using is using the
>>>>>>>> >> GenericDatumReader that assumes the reader and writer schema are the
>>>>>>>> >> same:
>>>>>>>> >>
>>>>>>>> >> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83
>>>>>>>> >>
>>>>>>>> >> I do not believe that the "default" value is taken into account for
>>>>>>>> >> data that is strictly missing from the binary input, just when a field
>>>>>>>> >> is known to be in the reader schema but missing from the original
>>>>>>>> >> writer.
>>>>>>>> >>
>>>>>>>> >> You may have more luck reading the GenericRecord with a
>>>>>>>> >> GenericDatumReader with both schemas, and using the
>>>>>>>> >> `convertToJson(record)`.
>>>>>>>> >>
>>>>>>>> >> I hope this is useful -- Ryan
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <al...@gmail.com> wrote:
>>>>>>>> >> >
>>>>>>>> >> > Hi,
>>>>>>>> >> >
>>>>>>>> >> > I've got some issues/misunderstanding of AVRO schema evolution.
>>>>>>>> >> >
>>>>>>>> >> > When reading through avro documentation, for example [1], I understood, that schema evolution is supported, and if I added column with specified default, it should be backwards compatible (and even forward when I remove it again). Sounds great, so I added column defined as:
>>>>>>>> >> >
>>>>>>>> >> >         {
>>>>>>>> >> >           "name": "newColumn",
>>>>>>>> >> >           "type": ["null","string"],
>>>>>>>> >> >           "default": null,
>>>>>>>> >> >           "doc": "something wrong"
>>>>>>>> >> >         }
>>>>>>>> >> >
>>>>>>>> >> > and try to consumer some topic having this schema from beginning, it fails with message:
>>>>>>>> >> >
>>>>>>>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
>>>>>>>> >> >     at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>>>>>>>> >> >     at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>>>>>>>> >> >     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>>>>>>> >> >     at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>>>>> >> >     at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>>>>>> >> > to give a little bit more information. Avro schema defines one top level type, having 2 fields. String describing type of message, and union of N types. All N-1, non-modified types can be read, but one updated with optional, default-having column cannot be read. I'm not sure if this design is strictly speaking correct, but that's not the point (feel free to criticise and recommend better approach!). I'm after schema evolution, which seems not to be working.
>>>>>>>> >> >
>>>>>>>> >> >
>>>>>>>> >> > And if we alter type definition to:
>>>>>>>> >> >
>>>>>>>> >> > "type": "string",
>>>>>>>> >> > "default": ""
>>>>>>>> >> > it still does not work and generated error is:
>>>>>>>> >> >
>>>>>>>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
>>>>>>>> >> >     at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>>>>>>>> >> >     at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>>>>>>>> >> >     at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>>> >> >     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>>>>> >> >     at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>>>>>> >> >
>>>>>>>> >> > Am I doing something wrong?
>>>>>>>> >> >
>>>>>>>> >> > thanks,
>>>>>>>> >> > Martin.
>>>>>>>> >> >
>>>>>>>> >> > [1] https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules

Re: AVRO schema evolution: adding optional column with default fails deserialization

Posted by Svante Karlsson <sv...@csi.se>.
First of all you can use confluents schema registry as you which - it's not
in the paid bundle as long as you are not hosting kafka as a service (ie
amazon et al). And I would recommend you to. It's good and trivial to
operate.

Second,  take a look at the serializer in my pet project at:
https://github.com/bitbouncer/kspp/blob/master/include/kspp/avro/avro_serdes.h
:96

Note that this encoder/decoder does not support schema evolution but it
discovers the actual written schema and gets a "avro::ValidSchema" from the
schema registry on read. And this is what you need.

This is of course c++ but you can probably figure out what you need to do.

In the end you will need a rest/grpc service somewhere that your serializer
can use to get an in that you can refer to across your infrastructure. I
did write one some years ago but reverted to confluents since most people
use that.

/svante














Den tors 1 aug. 2019 kl 18:05 skrev Martin Mucha <al...@gmail.com>:

> Thanks for answer!
>
> Ad: "which byte[] are we talking about?" — actually I don't know. Please
> lets break it down together.
>
> I'm pretty sure, that we're not using confluent platform(iiuc the paid
> bundle, right?). I shared some serializer before [1], so you're saying,
> that this wont include neither schema ID, nor schema OK? Ok, lets assume
> that. Next. We're using SpringKafka project, to get this serialized data
> and send them over kafka. So we don't have any schema registry, but in
> principle it could be possible to include schema within each message. But I
> cannot see how that could be done. SpringKafka requires us to provide
> him org.apache.kafka.clients.producer.ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG,
> which we did, but it's just a class calling serializer [1], and from that
> point on I have no idea how it could figure out used schema. The question
> here I'm asking is, whether when sending avro bytes (obtained by provided
> serializer[1]), they are or can be somehow paired with schema used to
> serialize data? Is this what kafka senders do, or can do? Include ID/whole
> schema somewhere in headers or ...??? And when I read kafka messages, will
> the schema be (or could be) somewhere stored in ConsumerRecord or somewhere
> like that?
>
> sorry for confused questions, but I'm really missing knowledge to even ask
> properly.
>
> thanks,
> Martin.
>
> [1]
> public static <T extends SpecificRecordBase> byte[] serialize(T data,
> boolean useBinaryDecoder, boolean pretty) {
>         try {
>             if (data == null) {
>                 return new byte[0];
>             }
>
>             log.debug("data='{}'", data);
>             Schema schema = data.getSchema();
>             ByteArrayOutputStream byteArrayOutputStream = new
> ByteArrayOutputStream();
>             Encoder binaryEncoder = useBinaryDecoder
>                     ?
> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>                     : EncoderFactory.get().jsonEncoder(schema,
> byteArrayOutputStream, pretty);
>
>             DatumWriter<GenericRecord> datumWriter = new
> GenericDatumWriter<>(schema);
>             datumWriter.write(data, binaryEncoder);
>
>             binaryEncoder.flush();
>             byteArrayOutputStream.close();
>
>             byte[] result = byteArrayOutputStream.toByteArray();
>             log.debug("serialized data='{}'",
> DatatypeConverter.printHexBinary(result));
>             return result;
>         } catch (IOException ex) {
>             throw new SerializationException(
>                     "Can't serialize data='" + data, ex);
>         }
>     }
>
> čt 1. 8. 2019 v 17:06 odesílatel Svante Karlsson <sv...@csi.se>
> napsal:
>
>> For clarity: What byte[] are we talking about?
>>
>> You are slightly missing my point if we are speaking about kafka.
>>
>> Confluent encoding:
>> <byte> <int32_t>    <avro_binary_payload>
>> 0          schema_id  avro
>>
>> avro_binary_payload does not in any case contain the schema or schema id.
>> The schema id is a confluent thing. (in an avrofile the schema is prepended
>> by value in the file)
>>
>> While it's trivial to build a schema registry that for example instead
>> gives you a md5 hash of the schema you have to use it throughout your
>> infrastructure OR use known reader and writer schema (ie hardcoded).
>>
>> In confluent world the id=N is the N+1'th registered schema in the
>> database (a kafka topic) if I remember right. Loose that database and you
>> cannot read your kafka topics.
>>
>> So you have to use some other encoder, homegrown or not that embeds
>> either the full schema in every message (expensive) of some id. Does this
>> make sense?
>>
>> /svante
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Den tors 1 aug. 2019 kl 16:38 skrev Martin Mucha <al...@gmail.com>:
>>
>>> Thanks for answer.
>>>
>>> What I knew already is, that in each message there is _somehow_ present
>>> either _some_ schema ID or full schema. I saw some byte array manipulations
>>> to get _somehow_ defined schema ID from byte[], which worked, but that's
>>> definitely not how it should be done. What I'm looking for is some
>>> documentation of _how_ to do these things right. I really cannot find a
>>> single thing, yet there must be some util functions, or anything. Is there
>>> some devel-first-steps page, where can I find answers for:
>>>
>>> * How to test, whether byte[] contains full schema or just id?
>>> * How to control, whether message is serialized with ID or with full
>>> schema?
>>> * how to get ID from byte[]?
>>> * how to get full schema from byte[]?
>>>
>>> I don't have confluent platform, and cannot have it, but implementing
>>> "get schema by ID" should be easy task, provided, that I have that ID. In
>>> my scenario I know, that message will be written using one schema, just
>>> different versions of it. So I just need to know, which version it is, so
>>> that I can configure deserializer to enable schema evolution.
>>>
>>> thanks in advance,
>>> Martin
>>>
>>> čt 1. 8. 2019 v 15:55 odesílatel Svante Karlsson <sv...@csi.se>
>>> napsal:
>>>
>>>> In an avrofile the schema is in the beginning but if you refer a single
>>>> record serialization like Kafka then you have to add something that you can
>>>> use to get hold of the schema. Confluents avroencoder for Kafka uses
>>>> confluents schema registry that uses int32 as schema Id. This is prepended
>>>> (+a magic byte) to the binary avro. Thus using the schema registry again
>>>> you can get the writer schema.
>>>>
>>>> /Svante
>>>>
>>>> On Thu, Aug 1, 2019, 15:30 Martin Mucha <al...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> just one more question, not strictly related to the subject.
>>>>>
>>>>> Initially I though I'd be OK with using some initial version of schema
>>>>> in place of writer schema. That works, but all columns from schema older
>>>>> than this initial one would be just ignored. So I need to know EXACTLY the
>>>>> schema, which writer used. I know, that avro messages contains either full
>>>>> schema or at least it's ID. Can you point me to the documentation, where
>>>>> this is discussed? So in my deserializer I have byte[] as a input, from
>>>>> which I need to get the schema information first, in order to be able to
>>>>> deserialize the record. I really do not know how to do that, I'm pretty
>>>>> sure I never saw this anywhere, and I cannot find it anywhere. But in
>>>>> principle it must be possible, since reader need not necessarily have any
>>>>> control of which schema writer used.
>>>>>
>>>>> thanks a lot.
>>>>> M.
>>>>>
>>>>> út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <al...@gmail.com>
>>>>> napsal:
>>>>>
>>>>>> Thank you very much for in depth answer. I understand how it works
>>>>>> now better, will test it shortly.
>>>>>> Thank you for your time.
>>>>>>
>>>>>> Martin.
>>>>>>
>>>>>> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <ry...@skraba.com>
>>>>>> napsal:
>>>>>>
>>>>>>> Hello!  It's the same issue in your example code as allegro, even
>>>>>>> with
>>>>>>> the SpecificDatumReader.
>>>>>>>
>>>>>>> This line: datumReader = new SpecificDatumReader<>(schema)
>>>>>>> should be: datumReader = new SpecificDatumReader<>(originalSchema,
>>>>>>> schema)
>>>>>>>
>>>>>>> In Avro, the original schema is commonly known as the writer schema
>>>>>>> (the instance that originally wrote the binary data).  Schema
>>>>>>> evolution applies when you are using the constructor of the
>>>>>>> SpecificDatumReader that takes *both* reader and writer schemas.
>>>>>>>
>>>>>>> As a concrete example, if your original schema was:
>>>>>>>
>>>>>>> {
>>>>>>>   "type": "record",
>>>>>>>   "name": "Simple",
>>>>>>>   "fields": [
>>>>>>>     {"name": "id", "type": "int"},
>>>>>>>     {"name": "name","type": "string"}
>>>>>>>   ]
>>>>>>> }
>>>>>>>
>>>>>>> And you added a field:
>>>>>>>
>>>>>>> {
>>>>>>>   "type": "record",
>>>>>>>   "name": "SimpleV2",
>>>>>>>   "fields": [
>>>>>>>     {"name": "id", "type": "int"},
>>>>>>>     {"name": "name", "type": "string"},
>>>>>>>     {"name": "description","type": ["null", "string"]}
>>>>>>>   ]
>>>>>>> }
>>>>>>>
>>>>>>> You could do the following safely, assuming that Simple and SimpleV2
>>>>>>> classes are generated from the avro-maven-plugin:
>>>>>>>
>>>>>>> @Test
>>>>>>> public void testSerializeDeserializeEvolution() throws IOException {
>>>>>>>   // Write a Simple v1 to bytes using your exact method.
>>>>>>>   byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false);
>>>>>>>
>>>>>>>   // Read as Simple v2, same as your method but with the writer and
>>>>>>> reader schema.
>>>>>>>   DatumReader<SimpleV2> datumReader =
>>>>>>>       new SpecificDatumReader<>(Simple.getClassSchema(),
>>>>>>> SimpleV2.getClassSchema());
>>>>>>>   Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes,
>>>>>>> null);
>>>>>>>   SimpleV2 v2 = datumReader.read(null, decoder);
>>>>>>>
>>>>>>>   assertThat(v2.getId(), is(1));
>>>>>>>   assertThat(v2.getName(), is(new Utf8("name1")));
>>>>>>>   assertThat(v2.getDescription(), nullValue());
>>>>>>> }
>>>>>>>
>>>>>>> This demonstrates with two different schemas and SpecificRecords in
>>>>>>> the same test, but the same principle applies if it's the same record
>>>>>>> that has evolved -- you need to know the original schema that wrote
>>>>>>> the data in order to apply the schema that you're now using for
>>>>>>> reading.
>>>>>>>
>>>>>>> I hope this clarifies what you are looking for!
>>>>>>>
>>>>>>> All my best, Ryan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <al...@gmail.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Thanks for answer.
>>>>>>> >
>>>>>>> > Actually I have exactly the same behavior with avro 1.9.0 and
>>>>>>> following deserializer in our other app, which uses strictly avro codebase,
>>>>>>> and failing with same exceptions. So lets leave "allegro" library and lots
>>>>>>> of other tools out of it in our discussion.
>>>>>>> > I can use whichever aproach. All I need is single way, where I can
>>>>>>> deserialize byte[] into class generated by avro-maven-plugin, and which
>>>>>>> will respect documentation regarding schema evolution. Currently we're
>>>>>>> using following deserializer and serializer, and these does not work when
>>>>>>> it comes to schema evolution. What is the correct way to serialize and
>>>>>>> deserializer avro data?
>>>>>>> >
>>>>>>> > I probably don't understand your mention about GenericRecord or
>>>>>>> GenericDatumReader. I tried to use GenericDatumReader in deserializer
>>>>>>> below, but then it seems I got back just GenericData$Record instance, which
>>>>>>> I can use then to access array of instances, which is not what I'm looking
>>>>>>> for(IIUC), since in that case I could have just use plain old JSON and
>>>>>>> deserialize it using jackson having no schema evolution problems at all. If
>>>>>>> that's correct, I'd rather stick to SpecificDatumReader, and somehow fix it
>>>>>>> if possible.
>>>>>>> >
>>>>>>> > What can be done? Or how schema evolution is intended to be used?
>>>>>>> I found a lots of question searching for this answer.
>>>>>>> >
>>>>>>> > thanks!
>>>>>>> > Martin.
>>>>>>> >
>>>>>>> > deserializer:
>>>>>>> >
>>>>>>> > public static <T extends SpecificRecordBase> T
>>>>>>> deserialize(Class<T> targetType,
>>>>>>> >
>>>>>>> byte[] data,
>>>>>>> >
>>>>>>> boolean useBinaryDecoder) {
>>>>>>> >         try {
>>>>>>> >             if (data == null) {
>>>>>>> >                 return null;
>>>>>>> >             }
>>>>>>> >
>>>>>>> >             log.trace("data='{}'",
>>>>>>> DatatypeConverter.printHexBinary(data));
>>>>>>> >
>>>>>>> >             Schema schema = targetType.newInstance().getSchema();
>>>>>>> >             DatumReader<GenericRecord> datumReader = new
>>>>>>> SpecificDatumReader<>(schema);
>>>>>>> >             Decoder decoder = useBinaryDecoder
>>>>>>> >                     ? DecoderFactory.get().binaryDecoder(data,
>>>>>>> null)
>>>>>>> >                     : DecoderFactory.get().jsonDecoder(schema, new
>>>>>>> String(data));
>>>>>>> >
>>>>>>> >             T result = targetType.cast(datumReader.read(null,
>>>>>>> decoder));
>>>>>>> >             log.trace("deserialized data='{}'", result);
>>>>>>> >             return result;
>>>>>>> >         } catch (Exception ex) {
>>>>>>> >             throw new SerializationException("Error deserializing
>>>>>>> data", ex);
>>>>>>> >         }
>>>>>>> >     }
>>>>>>> >
>>>>>>> > serializer:
>>>>>>> > public static <T extends SpecificRecordBase> byte[] serialize(T
>>>>>>> data, boolean useBinaryDecoder, boolean pretty) {
>>>>>>> >         try {
>>>>>>> >             if (data == null) {
>>>>>>> >                 return new byte[0];
>>>>>>> >             }
>>>>>>> >
>>>>>>> >             log.debug("data='{}'", data);
>>>>>>> >             Schema schema = data.getSchema();
>>>>>>> >             ByteArrayOutputStream byteArrayOutputStream = new
>>>>>>> ByteArrayOutputStream();
>>>>>>> >             Encoder binaryEncoder = useBinaryDecoder
>>>>>>> >                     ?
>>>>>>> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>>>>>>> >                     : EncoderFactory.get().jsonEncoder(schema,
>>>>>>> byteArrayOutputStream, pretty);
>>>>>>> >
>>>>>>> >             DatumWriter<GenericRecord> datumWriter = new
>>>>>>> GenericDatumWriter<>(schema);
>>>>>>> >             datumWriter.write(data, binaryEncoder);
>>>>>>> >
>>>>>>> >             binaryEncoder.flush();
>>>>>>> >             byteArrayOutputStream.close();
>>>>>>> >
>>>>>>> >             byte[] result = byteArrayOutputStream.toByteArray();
>>>>>>> >             log.debug("serialized data='{}'",
>>>>>>> DatatypeConverter.printHexBinary(result));
>>>>>>> >             return result;
>>>>>>> >         } catch (IOException ex) {
>>>>>>> >             throw new SerializationException(
>>>>>>> >                     "Can't serialize data='" + data, ex);
>>>>>>> >         }
>>>>>>> >     }
>>>>>>> >
>>>>>>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <ry...@skraba.com>
>>>>>>> napsal:
>>>>>>> >>
>>>>>>> >> Hello!  Schema evolution relies on both the writer and reader
>>>>>>> schemas
>>>>>>> >> being available.
>>>>>>> >>
>>>>>>> >> It looks like the allegro tool you are using is using the
>>>>>>> >> GenericDatumReader that assumes the reader and writer schema are
>>>>>>> the
>>>>>>> >> same:
>>>>>>> >>
>>>>>>> >>
>>>>>>> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83
>>>>>>> >>
>>>>>>> >> I do not believe that the "default" value is taken into account
>>>>>>> for
>>>>>>> >> data that is strictly missing from the binary input, just when a
>>>>>>> field
>>>>>>> >> is known to be in the reader schema but missing from the original
>>>>>>> >> writer.
>>>>>>> >>
>>>>>>> >> You may have more luck reading the GenericRecord with a
>>>>>>> >> GenericDatumReader with both schemas, and using the
>>>>>>> >> `convertToJson(record)`.
>>>>>>> >>
>>>>>>> >> I hope this is useful -- Ryan
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <al...@gmail.com>
>>>>>>> wrote:
>>>>>>> >> >
>>>>>>> >> > Hi,
>>>>>>> >> >
>>>>>>> >> > I've got some issues/misunderstanding of AVRO schema evolution.
>>>>>>> >> >
>>>>>>> >> > When reading through avro documentation, for example [1], I
>>>>>>> understood, that schema evolution is supported, and if I added column with
>>>>>>> specified default, it should be backwards compatible (and even forward when
>>>>>>> I remove it again). Sounds great, so I added column defined as:
>>>>>>> >> >
>>>>>>> >> >         {
>>>>>>> >> >           "name": "newColumn",
>>>>>>> >> >           "type": ["null","string"],
>>>>>>> >> >           "default": null,
>>>>>>> >> >           "doc": "something wrong"
>>>>>>> >> >         }
>>>>>>> >> >
>>>>>>> >> > and try to consumer some topic having this schema from
>>>>>>> beginning, it fails with message:
>>>>>>> >> >
>>>>>>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
>>>>>>> >> >     at
>>>>>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>>>>>>> >> >     at org.apache.avro.io
>>>>>>> .ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>>>>>>> >> >     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>>>>>> >> >     at org.apache.avro.io
>>>>>>> .ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>>>> >> >     at
>>>>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>>>>> >> > to give a little bit more information. Avro schema defines one
>>>>>>> top level type, having 2 fields. String describing type of message, and
>>>>>>> union of N types. All N-1, non-modified types can be read, but one updated
>>>>>>> with optional, default-having column cannot be read. I'm not sure if this
>>>>>>> design is strictly speaking correct, but that's not the point (feel free to
>>>>>>> criticise and recommend better approach!). I'm after schema evolution,
>>>>>>> which seems not to be working.
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > And if we alter type definition to:
>>>>>>> >> >
>>>>>>> >> > "type": "string",
>>>>>>> >> > "default": ""
>>>>>>> >> > it still does not work and generated error is:
>>>>>>> >> >
>>>>>>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed
>>>>>>> data. Length is negative: -1
>>>>>>> >> >     at org.apache.avro.io
>>>>>>> .BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>>>>>>> >> >     at org.apache.avro.io
>>>>>>> .BinaryDecoder.readString(BinaryDecoder.java:263)
>>>>>>> >> >     at org.apache.avro.io
>>>>>>> .ResolvingDecoder.readString(ResolvingDecoder.java:201)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>>> >> >     at
>>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>>>> >> >     at
>>>>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>>>>> >> >
>>>>>>> >> > Am I doing something wrong?
>>>>>>> >> >
>>>>>>> >> > thanks,
>>>>>>> >> > Martin.
>>>>>>> >> >
>>>>>>> >> > [1]
>>>>>>> https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules
>>>>>>>
>>>>>>

Re: AVRO schema evolution: adding optional column with default fails deserialization

Posted by Martin Mucha <al...@gmail.com>.
Thanks for answer!

Ad: "which byte[] are we talking about?" — actually I don't know. Please
lets break it down together.

I'm pretty sure, that we're not using confluent platform(iiuc the paid
bundle, right?). I shared some serializer before [1], so you're saying,
that this wont include neither schema ID, nor schema OK? Ok, lets assume
that. Next. We're using SpringKafka project, to get this serialized data
and send them over kafka. So we don't have any schema registry, but in
principle it could be possible to include schema within each message. But I
cannot see how that could be done. SpringKafka requires us to provide
him org.apache.kafka.clients.producer.ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG,
which we did, but it's just a class calling serializer [1], and from that
point on I have no idea how it could figure out used schema. The question
here I'm asking is, whether when sending avro bytes (obtained by provided
serializer[1]), they are or can be somehow paired with schema used to
serialize data? Is this what kafka senders do, or can do? Include ID/whole
schema somewhere in headers or ...??? And when I read kafka messages, will
the schema be (or could be) somewhere stored in ConsumerRecord or somewhere
like that?

sorry for confused questions, but I'm really missing knowledge to even ask
properly.

thanks,
Martin.

[1]
public static <T extends SpecificRecordBase> byte[] serialize(T data,
boolean useBinaryDecoder, boolean pretty) {
        try {
            if (data == null) {
                return new byte[0];
            }

            log.debug("data='{}'", data);
            Schema schema = data.getSchema();
            ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
            Encoder binaryEncoder = useBinaryDecoder
                    ?
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
                    : EncoderFactory.get().jsonEncoder(schema,
byteArrayOutputStream, pretty);

            DatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(schema);
            datumWriter.write(data, binaryEncoder);

            binaryEncoder.flush();
            byteArrayOutputStream.close();

            byte[] result = byteArrayOutputStream.toByteArray();
            log.debug("serialized data='{}'",
DatatypeConverter.printHexBinary(result));
            return result;
        } catch (IOException ex) {
            throw new SerializationException(
                    "Can't serialize data='" + data, ex);
        }
    }

čt 1. 8. 2019 v 17:06 odesílatel Svante Karlsson <sv...@csi.se>
napsal:

> For clarity: What byte[] are we talking about?
>
> You are slightly missing my point if we are speaking about kafka.
>
> Confluent encoding:
> <byte> <int32_t>    <avro_binary_payload>
> 0          schema_id  avro
>
> avro_binary_payload does not in any case contain the schema or schema id.
> The schema id is a confluent thing. (in an avrofile the schema is prepended
> by value in the file)
>
> While it's trivial to build a schema registry that for example instead
> gives you a md5 hash of the schema you have to use it throughout your
> infrastructure OR use known reader and writer schema (ie hardcoded).
>
> In confluent world the id=N is the N+1'th registered schema in the
> database (a kafka topic) if I remember right. Loose that database and you
> cannot read your kafka topics.
>
> So you have to use some other encoder, homegrown or not that embeds either
> the full schema in every message (expensive) of some id. Does this make
> sense?
>
> /svante
>
>
>
>
>
>
>
>
>
>
> Den tors 1 aug. 2019 kl 16:38 skrev Martin Mucha <al...@gmail.com>:
>
>> Thanks for answer.
>>
>> What I knew already is, that in each message there is _somehow_ present
>> either _some_ schema ID or full schema. I saw some byte array manipulations
>> to get _somehow_ defined schema ID from byte[], which worked, but that's
>> definitely not how it should be done. What I'm looking for is some
>> documentation of _how_ to do these things right. I really cannot find a
>> single thing, yet there must be some util functions, or anything. Is there
>> some devel-first-steps page, where can I find answers for:
>>
>> * How to test, whether byte[] contains full schema or just id?
>> * How to control, whether message is serialized with ID or with full
>> schema?
>> * how to get ID from byte[]?
>> * how to get full schema from byte[]?
>>
>> I don't have confluent platform, and cannot have it, but implementing
>> "get schema by ID" should be easy task, provided, that I have that ID. In
>> my scenario I know, that message will be written using one schema, just
>> different versions of it. So I just need to know, which version it is, so
>> that I can configure deserializer to enable schema evolution.
>>
>> thanks in advance,
>> Martin
>>
>> čt 1. 8. 2019 v 15:55 odesílatel Svante Karlsson <sv...@csi.se>
>> napsal:
>>
>>> In an avrofile the schema is in the beginning but if you refer a single
>>> record serialization like Kafka then you have to add something that you can
>>> use to get hold of the schema. Confluents avroencoder for Kafka uses
>>> confluents schema registry that uses int32 as schema Id. This is prepended
>>> (+a magic byte) to the binary avro. Thus using the schema registry again
>>> you can get the writer schema.
>>>
>>> /Svante
>>>
>>> On Thu, Aug 1, 2019, 15:30 Martin Mucha <al...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> just one more question, not strictly related to the subject.
>>>>
>>>> Initially I though I'd be OK with using some initial version of schema
>>>> in place of writer schema. That works, but all columns from schema older
>>>> than this initial one would be just ignored. So I need to know EXACTLY the
>>>> schema, which writer used. I know, that avro messages contains either full
>>>> schema or at least it's ID. Can you point me to the documentation, where
>>>> this is discussed? So in my deserializer I have byte[] as a input, from
>>>> which I need to get the schema information first, in order to be able to
>>>> deserialize the record. I really do not know how to do that, I'm pretty
>>>> sure I never saw this anywhere, and I cannot find it anywhere. But in
>>>> principle it must be possible, since reader need not necessarily have any
>>>> control of which schema writer used.
>>>>
>>>> thanks a lot.
>>>> M.
>>>>
>>>> út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <al...@gmail.com>
>>>> napsal:
>>>>
>>>>> Thank you very much for in depth answer. I understand how it works now
>>>>> better, will test it shortly.
>>>>> Thank you for your time.
>>>>>
>>>>> Martin.
>>>>>
>>>>> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <ry...@skraba.com>
>>>>> napsal:
>>>>>
>>>>>> Hello!  It's the same issue in your example code as allegro, even with
>>>>>> the SpecificDatumReader.
>>>>>>
>>>>>> This line: datumReader = new SpecificDatumReader<>(schema)
>>>>>> should be: datumReader = new SpecificDatumReader<>(originalSchema,
>>>>>> schema)
>>>>>>
>>>>>> In Avro, the original schema is commonly known as the writer schema
>>>>>> (the instance that originally wrote the binary data).  Schema
>>>>>> evolution applies when you are using the constructor of the
>>>>>> SpecificDatumReader that takes *both* reader and writer schemas.
>>>>>>
>>>>>> As a concrete example, if your original schema was:
>>>>>>
>>>>>> {
>>>>>>   "type": "record",
>>>>>>   "name": "Simple",
>>>>>>   "fields": [
>>>>>>     {"name": "id", "type": "int"},
>>>>>>     {"name": "name","type": "string"}
>>>>>>   ]
>>>>>> }
>>>>>>
>>>>>> And you added a field:
>>>>>>
>>>>>> {
>>>>>>   "type": "record",
>>>>>>   "name": "SimpleV2",
>>>>>>   "fields": [
>>>>>>     {"name": "id", "type": "int"},
>>>>>>     {"name": "name", "type": "string"},
>>>>>>     {"name": "description","type": ["null", "string"]}
>>>>>>   ]
>>>>>> }
>>>>>>
>>>>>> You could do the following safely, assuming that Simple and SimpleV2
>>>>>> classes are generated from the avro-maven-plugin:
>>>>>>
>>>>>> @Test
>>>>>> public void testSerializeDeserializeEvolution() throws IOException {
>>>>>>   // Write a Simple v1 to bytes using your exact method.
>>>>>>   byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false);
>>>>>>
>>>>>>   // Read as Simple v2, same as your method but with the writer and
>>>>>> reader schema.
>>>>>>   DatumReader<SimpleV2> datumReader =
>>>>>>       new SpecificDatumReader<>(Simple.getClassSchema(),
>>>>>> SimpleV2.getClassSchema());
>>>>>>   Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes,
>>>>>> null);
>>>>>>   SimpleV2 v2 = datumReader.read(null, decoder);
>>>>>>
>>>>>>   assertThat(v2.getId(), is(1));
>>>>>>   assertThat(v2.getName(), is(new Utf8("name1")));
>>>>>>   assertThat(v2.getDescription(), nullValue());
>>>>>> }
>>>>>>
>>>>>> This demonstrates with two different schemas and SpecificRecords in
>>>>>> the same test, but the same principle applies if it's the same record
>>>>>> that has evolved -- you need to know the original schema that wrote
>>>>>> the data in order to apply the schema that you're now using for
>>>>>> reading.
>>>>>>
>>>>>> I hope this clarifies what you are looking for!
>>>>>>
>>>>>> All my best, Ryan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <al...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > Thanks for answer.
>>>>>> >
>>>>>> > Actually I have exactly the same behavior with avro 1.9.0 and
>>>>>> following deserializer in our other app, which uses strictly avro codebase,
>>>>>> and failing with same exceptions. So lets leave "allegro" library and lots
>>>>>> of other tools out of it in our discussion.
>>>>>> > I can use whichever aproach. All I need is single way, where I can
>>>>>> deserialize byte[] into class generated by avro-maven-plugin, and which
>>>>>> will respect documentation regarding schema evolution. Currently we're
>>>>>> using following deserializer and serializer, and these does not work when
>>>>>> it comes to schema evolution. What is the correct way to serialize and
>>>>>> deserializer avro data?
>>>>>> >
>>>>>> > I probably don't understand your mention about GenericRecord or
>>>>>> GenericDatumReader. I tried to use GenericDatumReader in deserializer
>>>>>> below, but then it seems I got back just GenericData$Record instance, which
>>>>>> I can use then to access array of instances, which is not what I'm looking
>>>>>> for(IIUC), since in that case I could have just use plain old JSON and
>>>>>> deserialize it using jackson having no schema evolution problems at all. If
>>>>>> that's correct, I'd rather stick to SpecificDatumReader, and somehow fix it
>>>>>> if possible.
>>>>>> >
>>>>>> > What can be done? Or how schema evolution is intended to be used? I
>>>>>> found a lots of question searching for this answer.
>>>>>> >
>>>>>> > thanks!
>>>>>> > Martin.
>>>>>> >
>>>>>> > deserializer:
>>>>>> >
>>>>>> > public static <T extends SpecificRecordBase> T deserialize(Class<T>
>>>>>> targetType,
>>>>>> >
>>>>>> byte[] data,
>>>>>> >
>>>>>> boolean useBinaryDecoder) {
>>>>>> >         try {
>>>>>> >             if (data == null) {
>>>>>> >                 return null;
>>>>>> >             }
>>>>>> >
>>>>>> >             log.trace("data='{}'",
>>>>>> DatatypeConverter.printHexBinary(data));
>>>>>> >
>>>>>> >             Schema schema = targetType.newInstance().getSchema();
>>>>>> >             DatumReader<GenericRecord> datumReader = new
>>>>>> SpecificDatumReader<>(schema);
>>>>>> >             Decoder decoder = useBinaryDecoder
>>>>>> >                     ? DecoderFactory.get().binaryDecoder(data, null)
>>>>>> >                     : DecoderFactory.get().jsonDecoder(schema, new
>>>>>> String(data));
>>>>>> >
>>>>>> >             T result = targetType.cast(datumReader.read(null,
>>>>>> decoder));
>>>>>> >             log.trace("deserialized data='{}'", result);
>>>>>> >             return result;
>>>>>> >         } catch (Exception ex) {
>>>>>> >             throw new SerializationException("Error deserializing
>>>>>> data", ex);
>>>>>> >         }
>>>>>> >     }
>>>>>> >
>>>>>> > serializer:
>>>>>> > public static <T extends SpecificRecordBase> byte[] serialize(T
>>>>>> data, boolean useBinaryDecoder, boolean pretty) {
>>>>>> >         try {
>>>>>> >             if (data == null) {
>>>>>> >                 return new byte[0];
>>>>>> >             }
>>>>>> >
>>>>>> >             log.debug("data='{}'", data);
>>>>>> >             Schema schema = data.getSchema();
>>>>>> >             ByteArrayOutputStream byteArrayOutputStream = new
>>>>>> ByteArrayOutputStream();
>>>>>> >             Encoder binaryEncoder = useBinaryDecoder
>>>>>> >                     ?
>>>>>> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>>>>>> >                     : EncoderFactory.get().jsonEncoder(schema,
>>>>>> byteArrayOutputStream, pretty);
>>>>>> >
>>>>>> >             DatumWriter<GenericRecord> datumWriter = new
>>>>>> GenericDatumWriter<>(schema);
>>>>>> >             datumWriter.write(data, binaryEncoder);
>>>>>> >
>>>>>> >             binaryEncoder.flush();
>>>>>> >             byteArrayOutputStream.close();
>>>>>> >
>>>>>> >             byte[] result = byteArrayOutputStream.toByteArray();
>>>>>> >             log.debug("serialized data='{}'",
>>>>>> DatatypeConverter.printHexBinary(result));
>>>>>> >             return result;
>>>>>> >         } catch (IOException ex) {
>>>>>> >             throw new SerializationException(
>>>>>> >                     "Can't serialize data='" + data, ex);
>>>>>> >         }
>>>>>> >     }
>>>>>> >
>>>>>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <ry...@skraba.com>
>>>>>> napsal:
>>>>>> >>
>>>>>> >> Hello!  Schema evolution relies on both the writer and reader
>>>>>> schemas
>>>>>> >> being available.
>>>>>> >>
>>>>>> >> It looks like the allegro tool you are using is using the
>>>>>> >> GenericDatumReader that assumes the reader and writer schema are
>>>>>> the
>>>>>> >> same:
>>>>>> >>
>>>>>> >>
>>>>>> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83
>>>>>> >>
>>>>>> >> I do not believe that the "default" value is taken into account for
>>>>>> >> data that is strictly missing from the binary input, just when a
>>>>>> field
>>>>>> >> is known to be in the reader schema but missing from the original
>>>>>> >> writer.
>>>>>> >>
>>>>>> >> You may have more luck reading the GenericRecord with a
>>>>>> >> GenericDatumReader with both schemas, and using the
>>>>>> >> `convertToJson(record)`.
>>>>>> >>
>>>>>> >> I hope this is useful -- Ryan
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <al...@gmail.com>
>>>>>> wrote:
>>>>>> >> >
>>>>>> >> > Hi,
>>>>>> >> >
>>>>>> >> > I've got some issues/misunderstanding of AVRO schema evolution.
>>>>>> >> >
>>>>>> >> > When reading through avro documentation, for example [1], I
>>>>>> understood, that schema evolution is supported, and if I added column with
>>>>>> specified default, it should be backwards compatible (and even forward when
>>>>>> I remove it again). Sounds great, so I added column defined as:
>>>>>> >> >
>>>>>> >> >         {
>>>>>> >> >           "name": "newColumn",
>>>>>> >> >           "type": ["null","string"],
>>>>>> >> >           "default": null,
>>>>>> >> >           "doc": "something wrong"
>>>>>> >> >         }
>>>>>> >> >
>>>>>> >> > and try to consumer some topic having this schema from
>>>>>> beginning, it fails with message:
>>>>>> >> >
>>>>>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
>>>>>> >> >     at
>>>>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>>>>>> >> >     at org.apache.avro.io
>>>>>> .ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>>>>>> >> >     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>>>>> >> >     at org.apache.avro.io
>>>>>> .ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>>> >> >     at
>>>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>>>> >> > to give a little bit more information. Avro schema defines one
>>>>>> top level type, having 2 fields. String describing type of message, and
>>>>>> union of N types. All N-1, non-modified types can be read, but one updated
>>>>>> with optional, default-having column cannot be read. I'm not sure if this
>>>>>> design is strictly speaking correct, but that's not the point (feel free to
>>>>>> criticise and recommend better approach!). I'm after schema evolution,
>>>>>> which seems not to be working.
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > And if we alter type definition to:
>>>>>> >> >
>>>>>> >> > "type": "string",
>>>>>> >> > "default": ""
>>>>>> >> > it still does not work and generated error is:
>>>>>> >> >
>>>>>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data.
>>>>>> Length is negative: -1
>>>>>> >> >     at org.apache.avro.io
>>>>>> .BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>>>>>> >> >     at org.apache.avro.io
>>>>>> .BinaryDecoder.readString(BinaryDecoder.java:263)
>>>>>> >> >     at org.apache.avro.io
>>>>>> .ResolvingDecoder.readString(ResolvingDecoder.java:201)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>> >> >     at
>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>>> >> >     at
>>>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>>>> >> >
>>>>>> >> > Am I doing something wrong?
>>>>>> >> >
>>>>>> >> > thanks,
>>>>>> >> > Martin.
>>>>>> >> >
>>>>>> >> > [1]
>>>>>> https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules
>>>>>>
>>>>>

Re: AVRO schema evolution: adding optional column with default fails deserialization

Posted by Svante Karlsson <sv...@csi.se>.
For clarity: What byte[] are we talking about?

You are slightly missing my point if we are speaking about kafka.

Confluent encoding:
<byte> <int32_t>    <avro_binary_payload>
0          schema_id  avro

avro_binary_payload does not in any case contain the schema or schema id.
The schema id is a confluent thing. (in an avrofile the schema is prepended
by value in the file)

While it's trivial to build a schema registry that for example instead
gives you a md5 hash of the schema you have to use it throughout your
infrastructure OR use known reader and writer schema (ie hardcoded).

In confluent world the id=N is the N+1'th registered schema in the database
(a kafka topic) if I remember right. Loose that database and you cannot
read your kafka topics.

So you have to use some other encoder, homegrown or not that embeds either
the full schema in every message (expensive) of some id. Does this make
sense?

/svante










Den tors 1 aug. 2019 kl 16:38 skrev Martin Mucha <al...@gmail.com>:

> Thanks for answer.
>
> What I knew already is, that in each message there is _somehow_ present
> either _some_ schema ID or full schema. I saw some byte array manipulations
> to get _somehow_ defined schema ID from byte[], which worked, but that's
> definitely not how it should be done. What I'm looking for is some
> documentation of _how_ to do these things right. I really cannot find a
> single thing, yet there must be some util functions, or anything. Is there
> some devel-first-steps page, where can I find answers for:
>
> * How to test, whether byte[] contains full schema or just id?
> * How to control, whether message is serialized with ID or with full
> schema?
> * how to get ID from byte[]?
> * how to get full schema from byte[]?
>
> I don't have confluent platform, and cannot have it, but implementing "get
> schema by ID" should be easy task, provided, that I have that ID. In my
> scenario I know, that message will be written using one schema, just
> different versions of it. So I just need to know, which version it is, so
> that I can configure deserializer to enable schema evolution.
>
> thanks in advance,
> Martin
>
> čt 1. 8. 2019 v 15:55 odesílatel Svante Karlsson <sv...@csi.se>
> napsal:
>
>> In an avrofile the schema is in the beginning but if you refer a single
>> record serialization like Kafka then you have to add something that you can
>> use to get hold of the schema. Confluents avroencoder for Kafka uses
>> confluents schema registry that uses int32 as schema Id. This is prepended
>> (+a magic byte) to the binary avro. Thus using the schema registry again
>> you can get the writer schema.
>>
>> /Svante
>>
>> On Thu, Aug 1, 2019, 15:30 Martin Mucha <al...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> just one more question, not strictly related to the subject.
>>>
>>> Initially I though I'd be OK with using some initial version of schema
>>> in place of writer schema. That works, but all columns from schema older
>>> than this initial one would be just ignored. So I need to know EXACTLY the
>>> schema, which writer used. I know, that avro messages contains either full
>>> schema or at least it's ID. Can you point me to the documentation, where
>>> this is discussed? So in my deserializer I have byte[] as a input, from
>>> which I need to get the schema information first, in order to be able to
>>> deserialize the record. I really do not know how to do that, I'm pretty
>>> sure I never saw this anywhere, and I cannot find it anywhere. But in
>>> principle it must be possible, since reader need not necessarily have any
>>> control of which schema writer used.
>>>
>>> thanks a lot.
>>> M.
>>>
>>> út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <al...@gmail.com>
>>> napsal:
>>>
>>>> Thank you very much for in depth answer. I understand how it works now
>>>> better, will test it shortly.
>>>> Thank you for your time.
>>>>
>>>> Martin.
>>>>
>>>> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <ry...@skraba.com> napsal:
>>>>
>>>>> Hello!  It's the same issue in your example code as allegro, even with
>>>>> the SpecificDatumReader.
>>>>>
>>>>> This line: datumReader = new SpecificDatumReader<>(schema)
>>>>> should be: datumReader = new SpecificDatumReader<>(originalSchema,
>>>>> schema)
>>>>>
>>>>> In Avro, the original schema is commonly known as the writer schema
>>>>> (the instance that originally wrote the binary data).  Schema
>>>>> evolution applies when you are using the constructor of the
>>>>> SpecificDatumReader that takes *both* reader and writer schemas.
>>>>>
>>>>> As a concrete example, if your original schema was:
>>>>>
>>>>> {
>>>>>   "type": "record",
>>>>>   "name": "Simple",
>>>>>   "fields": [
>>>>>     {"name": "id", "type": "int"},
>>>>>     {"name": "name","type": "string"}
>>>>>   ]
>>>>> }
>>>>>
>>>>> And you added a field:
>>>>>
>>>>> {
>>>>>   "type": "record",
>>>>>   "name": "SimpleV2",
>>>>>   "fields": [
>>>>>     {"name": "id", "type": "int"},
>>>>>     {"name": "name", "type": "string"},
>>>>>     {"name": "description","type": ["null", "string"]}
>>>>>   ]
>>>>> }
>>>>>
>>>>> You could do the following safely, assuming that Simple and SimpleV2
>>>>> classes are generated from the avro-maven-plugin:
>>>>>
>>>>> @Test
>>>>> public void testSerializeDeserializeEvolution() throws IOException {
>>>>>   // Write a Simple v1 to bytes using your exact method.
>>>>>   byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false);
>>>>>
>>>>>   // Read as Simple v2, same as your method but with the writer and
>>>>> reader schema.
>>>>>   DatumReader<SimpleV2> datumReader =
>>>>>       new SpecificDatumReader<>(Simple.getClassSchema(),
>>>>> SimpleV2.getClassSchema());
>>>>>   Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes,
>>>>> null);
>>>>>   SimpleV2 v2 = datumReader.read(null, decoder);
>>>>>
>>>>>   assertThat(v2.getId(), is(1));
>>>>>   assertThat(v2.getName(), is(new Utf8("name1")));
>>>>>   assertThat(v2.getDescription(), nullValue());
>>>>> }
>>>>>
>>>>> This demonstrates with two different schemas and SpecificRecords in
>>>>> the same test, but the same principle applies if it's the same record
>>>>> that has evolved -- you need to know the original schema that wrote
>>>>> the data in order to apply the schema that you're now using for
>>>>> reading.
>>>>>
>>>>> I hope this clarifies what you are looking for!
>>>>>
>>>>> All my best, Ryan
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <al...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> > Thanks for answer.
>>>>> >
>>>>> > Actually I have exactly the same behavior with avro 1.9.0 and
>>>>> following deserializer in our other app, which uses strictly avro codebase,
>>>>> and failing with same exceptions. So lets leave "allegro" library and lots
>>>>> of other tools out of it in our discussion.
>>>>> > I can use whichever aproach. All I need is single way, where I can
>>>>> deserialize byte[] into class generated by avro-maven-plugin, and which
>>>>> will respect documentation regarding schema evolution. Currently we're
>>>>> using following deserializer and serializer, and these does not work when
>>>>> it comes to schema evolution. What is the correct way to serialize and
>>>>> deserializer avro data?
>>>>> >
>>>>> > I probably don't understand your mention about GenericRecord or
>>>>> GenericDatumReader. I tried to use GenericDatumReader in deserializer
>>>>> below, but then it seems I got back just GenericData$Record instance, which
>>>>> I can use then to access array of instances, which is not what I'm looking
>>>>> for(IIUC), since in that case I could have just use plain old JSON and
>>>>> deserialize it using jackson having no schema evolution problems at all. If
>>>>> that's correct, I'd rather stick to SpecificDatumReader, and somehow fix it
>>>>> if possible.
>>>>> >
>>>>> > What can be done? Or how schema evolution is intended to be used? I
>>>>> found a lots of question searching for this answer.
>>>>> >
>>>>> > thanks!
>>>>> > Martin.
>>>>> >
>>>>> > deserializer:
>>>>> >
>>>>> > public static <T extends SpecificRecordBase> T deserialize(Class<T>
>>>>> targetType,
>>>>> >
>>>>> byte[] data,
>>>>> >
>>>>> boolean useBinaryDecoder) {
>>>>> >         try {
>>>>> >             if (data == null) {
>>>>> >                 return null;
>>>>> >             }
>>>>> >
>>>>> >             log.trace("data='{}'",
>>>>> DatatypeConverter.printHexBinary(data));
>>>>> >
>>>>> >             Schema schema = targetType.newInstance().getSchema();
>>>>> >             DatumReader<GenericRecord> datumReader = new
>>>>> SpecificDatumReader<>(schema);
>>>>> >             Decoder decoder = useBinaryDecoder
>>>>> >                     ? DecoderFactory.get().binaryDecoder(data, null)
>>>>> >                     : DecoderFactory.get().jsonDecoder(schema, new
>>>>> String(data));
>>>>> >
>>>>> >             T result = targetType.cast(datumReader.read(null,
>>>>> decoder));
>>>>> >             log.trace("deserialized data='{}'", result);
>>>>> >             return result;
>>>>> >         } catch (Exception ex) {
>>>>> >             throw new SerializationException("Error deserializing
>>>>> data", ex);
>>>>> >         }
>>>>> >     }
>>>>> >
>>>>> > serializer:
>>>>> > public static <T extends SpecificRecordBase> byte[] serialize(T
>>>>> data, boolean useBinaryDecoder, boolean pretty) {
>>>>> >         try {
>>>>> >             if (data == null) {
>>>>> >                 return new byte[0];
>>>>> >             }
>>>>> >
>>>>> >             log.debug("data='{}'", data);
>>>>> >             Schema schema = data.getSchema();
>>>>> >             ByteArrayOutputStream byteArrayOutputStream = new
>>>>> ByteArrayOutputStream();
>>>>> >             Encoder binaryEncoder = useBinaryDecoder
>>>>> >                     ?
>>>>> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>>>>> >                     : EncoderFactory.get().jsonEncoder(schema,
>>>>> byteArrayOutputStream, pretty);
>>>>> >
>>>>> >             DatumWriter<GenericRecord> datumWriter = new
>>>>> GenericDatumWriter<>(schema);
>>>>> >             datumWriter.write(data, binaryEncoder);
>>>>> >
>>>>> >             binaryEncoder.flush();
>>>>> >             byteArrayOutputStream.close();
>>>>> >
>>>>> >             byte[] result = byteArrayOutputStream.toByteArray();
>>>>> >             log.debug("serialized data='{}'",
>>>>> DatatypeConverter.printHexBinary(result));
>>>>> >             return result;
>>>>> >         } catch (IOException ex) {
>>>>> >             throw new SerializationException(
>>>>> >                     "Can't serialize data='" + data, ex);
>>>>> >         }
>>>>> >     }
>>>>> >
>>>>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <ry...@skraba.com>
>>>>> napsal:
>>>>> >>
>>>>> >> Hello!  Schema evolution relies on both the writer and reader
>>>>> schemas
>>>>> >> being available.
>>>>> >>
>>>>> >> It looks like the allegro tool you are using is using the
>>>>> >> GenericDatumReader that assumes the reader and writer schema are the
>>>>> >> same:
>>>>> >>
>>>>> >>
>>>>> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83
>>>>> >>
>>>>> >> I do not believe that the "default" value is taken into account for
>>>>> >> data that is strictly missing from the binary input, just when a
>>>>> field
>>>>> >> is known to be in the reader schema but missing from the original
>>>>> >> writer.
>>>>> >>
>>>>> >> You may have more luck reading the GenericRecord with a
>>>>> >> GenericDatumReader with both schemas, and using the
>>>>> >> `convertToJson(record)`.
>>>>> >>
>>>>> >> I hope this is useful -- Ryan
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <al...@gmail.com>
>>>>> wrote:
>>>>> >> >
>>>>> >> > Hi,
>>>>> >> >
>>>>> >> > I've got some issues/misunderstanding of AVRO schema evolution.
>>>>> >> >
>>>>> >> > When reading through avro documentation, for example [1], I
>>>>> understood, that schema evolution is supported, and if I added column with
>>>>> specified default, it should be backwards compatible (and even forward when
>>>>> I remove it again). Sounds great, so I added column defined as:
>>>>> >> >
>>>>> >> >         {
>>>>> >> >           "name": "newColumn",
>>>>> >> >           "type": ["null","string"],
>>>>> >> >           "default": null,
>>>>> >> >           "doc": "something wrong"
>>>>> >> >         }
>>>>> >> >
>>>>> >> > and try to consumer some topic having this schema from beginning,
>>>>> it fails with message:
>>>>> >> >
>>>>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
>>>>> >> >     at
>>>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>>>>> >> >     at org.apache.avro.io
>>>>> .ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>>>>> >> >     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>>>> >> >     at org.apache.avro.io
>>>>> .ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>> >> >     at
>>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>>> >> > to give a little bit more information. Avro schema defines one
>>>>> top level type, having 2 fields. String describing type of message, and
>>>>> union of N types. All N-1, non-modified types can be read, but one updated
>>>>> with optional, default-having column cannot be read. I'm not sure if this
>>>>> design is strictly speaking correct, but that's not the point (feel free to
>>>>> criticise and recommend better approach!). I'm after schema evolution,
>>>>> which seems not to be working.
>>>>> >> >
>>>>> >> >
>>>>> >> > And if we alter type definition to:
>>>>> >> >
>>>>> >> > "type": "string",
>>>>> >> > "default": ""
>>>>> >> > it still does not work and generated error is:
>>>>> >> >
>>>>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data.
>>>>> Length is negative: -1
>>>>> >> >     at org.apache.avro.io
>>>>> .BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>>>>> >> >     at org.apache.avro.io
>>>>> .BinaryDecoder.readString(BinaryDecoder.java:263)
>>>>> >> >     at org.apache.avro.io
>>>>> .ResolvingDecoder.readString(ResolvingDecoder.java:201)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>> >> >     at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>>> >> >     at
>>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>>> >> >
>>>>> >> > Am I doing something wrong?
>>>>> >> >
>>>>> >> > thanks,
>>>>> >> > Martin.
>>>>> >> >
>>>>> >> > [1]
>>>>> https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules
>>>>>
>>>>

Re: AVRO schema evolution: adding optional column with default fails deserialization

Posted by Martin Mucha <al...@gmail.com>.
Thanks for answer.

What I knew already is, that in each message there is _somehow_ present
either _some_ schema ID or full schema. I saw some byte array manipulations
to get _somehow_ defined schema ID from byte[], which worked, but that's
definitely not how it should be done. What I'm looking for is some
documentation of _how_ to do these things right. I really cannot find a
single thing, yet there must be some util functions, or anything. Is there
some devel-first-steps page, where can I find answers for:

* How to test, whether byte[] contains full schema or just id?
* How to control, whether message is serialized with ID or with full schema?
* how to get ID from byte[]?
* how to get full schema from byte[]?

I don't have confluent platform, and cannot have it, but implementing "get
schema by ID" should be easy task, provided, that I have that ID. In my
scenario I know, that message will be written using one schema, just
different versions of it. So I just need to know, which version it is, so
that I can configure deserializer to enable schema evolution.

thanks in advance,
Martin

čt 1. 8. 2019 v 15:55 odesílatel Svante Karlsson <sv...@csi.se>
napsal:

> In an avrofile the schema is in the beginning but if you refer a single
> record serialization like Kafka then you have to add something that you can
> use to get hold of the schema. Confluents avroencoder for Kafka uses
> confluents schema registry that uses int32 as schema Id. This is prepended
> (+a magic byte) to the binary avro. Thus using the schema registry again
> you can get the writer schema.
>
> /Svante
>
> On Thu, Aug 1, 2019, 15:30 Martin Mucha <al...@gmail.com> wrote:
>
>> Hi,
>>
>> just one more question, not strictly related to the subject.
>>
>> Initially I though I'd be OK with using some initial version of schema in
>> place of writer schema. That works, but all columns from schema older than
>> this initial one would be just ignored. So I need to know EXACTLY the
>> schema, which writer used. I know, that avro messages contains either full
>> schema or at least it's ID. Can you point me to the documentation, where
>> this is discussed? So in my deserializer I have byte[] as a input, from
>> which I need to get the schema information first, in order to be able to
>> deserialize the record. I really do not know how to do that, I'm pretty
>> sure I never saw this anywhere, and I cannot find it anywhere. But in
>> principle it must be possible, since reader need not necessarily have any
>> control of which schema writer used.
>>
>> thanks a lot.
>> M.
>>
>> út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <al...@gmail.com>
>> napsal:
>>
>>> Thank you very much for in depth answer. I understand how it works now
>>> better, will test it shortly.
>>> Thank you for your time.
>>>
>>> Martin.
>>>
>>> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <ry...@skraba.com> napsal:
>>>
>>>> Hello!  It's the same issue in your example code as allegro, even with
>>>> the SpecificDatumReader.
>>>>
>>>> This line: datumReader = new SpecificDatumReader<>(schema)
>>>> should be: datumReader = new SpecificDatumReader<>(originalSchema,
>>>> schema)
>>>>
>>>> In Avro, the original schema is commonly known as the writer schema
>>>> (the instance that originally wrote the binary data).  Schema
>>>> evolution applies when you are using the constructor of the
>>>> SpecificDatumReader that takes *both* reader and writer schemas.
>>>>
>>>> As a concrete example, if your original schema was:
>>>>
>>>> {
>>>>   "type": "record",
>>>>   "name": "Simple",
>>>>   "fields": [
>>>>     {"name": "id", "type": "int"},
>>>>     {"name": "name","type": "string"}
>>>>   ]
>>>> }
>>>>
>>>> And you added a field:
>>>>
>>>> {
>>>>   "type": "record",
>>>>   "name": "SimpleV2",
>>>>   "fields": [
>>>>     {"name": "id", "type": "int"},
>>>>     {"name": "name", "type": "string"},
>>>>     {"name": "description","type": ["null", "string"]}
>>>>   ]
>>>> }
>>>>
>>>> You could do the following safely, assuming that Simple and SimpleV2
>>>> classes are generated from the avro-maven-plugin:
>>>>
>>>> @Test
>>>> public void testSerializeDeserializeEvolution() throws IOException {
>>>>   // Write a Simple v1 to bytes using your exact method.
>>>>   byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false);
>>>>
>>>>   // Read as Simple v2, same as your method but with the writer and
>>>> reader schema.
>>>>   DatumReader<SimpleV2> datumReader =
>>>>       new SpecificDatumReader<>(Simple.getClassSchema(),
>>>> SimpleV2.getClassSchema());
>>>>   Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes, null);
>>>>   SimpleV2 v2 = datumReader.read(null, decoder);
>>>>
>>>>   assertThat(v2.getId(), is(1));
>>>>   assertThat(v2.getName(), is(new Utf8("name1")));
>>>>   assertThat(v2.getDescription(), nullValue());
>>>> }
>>>>
>>>> This demonstrates with two different schemas and SpecificRecords in
>>>> the same test, but the same principle applies if it's the same record
>>>> that has evolved -- you need to know the original schema that wrote
>>>> the data in order to apply the schema that you're now using for
>>>> reading.
>>>>
>>>> I hope this clarifies what you are looking for!
>>>>
>>>> All my best, Ryan
>>>>
>>>>
>>>>
>>>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <al...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Thanks for answer.
>>>> >
>>>> > Actually I have exactly the same behavior with avro 1.9.0 and
>>>> following deserializer in our other app, which uses strictly avro codebase,
>>>> and failing with same exceptions. So lets leave "allegro" library and lots
>>>> of other tools out of it in our discussion.
>>>> > I can use whichever aproach. All I need is single way, where I can
>>>> deserialize byte[] into class generated by avro-maven-plugin, and which
>>>> will respect documentation regarding schema evolution. Currently we're
>>>> using following deserializer and serializer, and these does not work when
>>>> it comes to schema evolution. What is the correct way to serialize and
>>>> deserializer avro data?
>>>> >
>>>> > I probably don't understand your mention about GenericRecord or
>>>> GenericDatumReader. I tried to use GenericDatumReader in deserializer
>>>> below, but then it seems I got back just GenericData$Record instance, which
>>>> I can use then to access array of instances, which is not what I'm looking
>>>> for(IIUC), since in that case I could have just use plain old JSON and
>>>> deserialize it using jackson having no schema evolution problems at all. If
>>>> that's correct, I'd rather stick to SpecificDatumReader, and somehow fix it
>>>> if possible.
>>>> >
>>>> > What can be done? Or how schema evolution is intended to be used? I
>>>> found a lots of question searching for this answer.
>>>> >
>>>> > thanks!
>>>> > Martin.
>>>> >
>>>> > deserializer:
>>>> >
>>>> > public static <T extends SpecificRecordBase> T deserialize(Class<T>
>>>> targetType,
>>>> >                                                                byte[]
>>>> data,
>>>> >
>>>> boolean useBinaryDecoder) {
>>>> >         try {
>>>> >             if (data == null) {
>>>> >                 return null;
>>>> >             }
>>>> >
>>>> >             log.trace("data='{}'",
>>>> DatatypeConverter.printHexBinary(data));
>>>> >
>>>> >             Schema schema = targetType.newInstance().getSchema();
>>>> >             DatumReader<GenericRecord> datumReader = new
>>>> SpecificDatumReader<>(schema);
>>>> >             Decoder decoder = useBinaryDecoder
>>>> >                     ? DecoderFactory.get().binaryDecoder(data, null)
>>>> >                     : DecoderFactory.get().jsonDecoder(schema, new
>>>> String(data));
>>>> >
>>>> >             T result = targetType.cast(datumReader.read(null,
>>>> decoder));
>>>> >             log.trace("deserialized data='{}'", result);
>>>> >             return result;
>>>> >         } catch (Exception ex) {
>>>> >             throw new SerializationException("Error deserializing
>>>> data", ex);
>>>> >         }
>>>> >     }
>>>> >
>>>> > serializer:
>>>> > public static <T extends SpecificRecordBase> byte[] serialize(T data,
>>>> boolean useBinaryDecoder, boolean pretty) {
>>>> >         try {
>>>> >             if (data == null) {
>>>> >                 return new byte[0];
>>>> >             }
>>>> >
>>>> >             log.debug("data='{}'", data);
>>>> >             Schema schema = data.getSchema();
>>>> >             ByteArrayOutputStream byteArrayOutputStream = new
>>>> ByteArrayOutputStream();
>>>> >             Encoder binaryEncoder = useBinaryDecoder
>>>> >                     ?
>>>> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>>>> >                     : EncoderFactory.get().jsonEncoder(schema,
>>>> byteArrayOutputStream, pretty);
>>>> >
>>>> >             DatumWriter<GenericRecord> datumWriter = new
>>>> GenericDatumWriter<>(schema);
>>>> >             datumWriter.write(data, binaryEncoder);
>>>> >
>>>> >             binaryEncoder.flush();
>>>> >             byteArrayOutputStream.close();
>>>> >
>>>> >             byte[] result = byteArrayOutputStream.toByteArray();
>>>> >             log.debug("serialized data='{}'",
>>>> DatatypeConverter.printHexBinary(result));
>>>> >             return result;
>>>> >         } catch (IOException ex) {
>>>> >             throw new SerializationException(
>>>> >                     "Can't serialize data='" + data, ex);
>>>> >         }
>>>> >     }
>>>> >
>>>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <ry...@skraba.com>
>>>> napsal:
>>>> >>
>>>> >> Hello!  Schema evolution relies on both the writer and reader schemas
>>>> >> being available.
>>>> >>
>>>> >> It looks like the allegro tool you are using is using the
>>>> >> GenericDatumReader that assumes the reader and writer schema are the
>>>> >> same:
>>>> >>
>>>> >>
>>>> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83
>>>> >>
>>>> >> I do not believe that the "default" value is taken into account for
>>>> >> data that is strictly missing from the binary input, just when a
>>>> field
>>>> >> is known to be in the reader schema but missing from the original
>>>> >> writer.
>>>> >>
>>>> >> You may have more luck reading the GenericRecord with a
>>>> >> GenericDatumReader with both schemas, and using the
>>>> >> `convertToJson(record)`.
>>>> >>
>>>> >> I hope this is useful -- Ryan
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <al...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Hi,
>>>> >> >
>>>> >> > I've got some issues/misunderstanding of AVRO schema evolution.
>>>> >> >
>>>> >> > When reading through avro documentation, for example [1], I
>>>> understood, that schema evolution is supported, and if I added column with
>>>> specified default, it should be backwards compatible (and even forward when
>>>> I remove it again). Sounds great, so I added column defined as:
>>>> >> >
>>>> >> >         {
>>>> >> >           "name": "newColumn",
>>>> >> >           "type": ["null","string"],
>>>> >> >           "default": null,
>>>> >> >           "doc": "something wrong"
>>>> >> >         }
>>>> >> >
>>>> >> > and try to consumer some topic having this schema from beginning,
>>>> it fails with message:
>>>> >> >
>>>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
>>>> >> >     at
>>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>>>> >> >     at org.apache.avro.io
>>>> .ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>>>> >> >     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>>> >> >     at org.apache.avro.io
>>>> .ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>> >> >     at
>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>> >> > to give a little bit more information. Avro schema defines one top
>>>> level type, having 2 fields. String describing type of message, and union
>>>> of N types. All N-1, non-modified types can be read, but one updated with
>>>> optional, default-having column cannot be read. I'm not sure if this design
>>>> is strictly speaking correct, but that's not the point (feel free to
>>>> criticise and recommend better approach!). I'm after schema evolution,
>>>> which seems not to be working.
>>>> >> >
>>>> >> >
>>>> >> > And if we alter type definition to:
>>>> >> >
>>>> >> > "type": "string",
>>>> >> > "default": ""
>>>> >> > it still does not work and generated error is:
>>>> >> >
>>>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data.
>>>> Length is negative: -1
>>>> >> >     at org.apache.avro.io
>>>> .BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>>>> >> >     at org.apache.avro.io
>>>> .BinaryDecoder.readString(BinaryDecoder.java:263)
>>>> >> >     at org.apache.avro.io
>>>> .ResolvingDecoder.readString(ResolvingDecoder.java:201)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>> >> >     at
>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>> >> >
>>>> >> > Am I doing something wrong?
>>>> >> >
>>>> >> > thanks,
>>>> >> > Martin.
>>>> >> >
>>>> >> > [1]
>>>> https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules
>>>>
>>>

Re: AVRO schema evolution: adding optional column with default fails deserialization

Posted by Svante Karlsson <sv...@csi.se>.
In an avrofile the schema is in the beginning but if you refer a single
record serialization like Kafka then you have to add something that you can
use to get hold of the schema. Confluents avroencoder for Kafka uses
confluents schema registry that uses int32 as schema Id. This is prepended
(+a magic byte) to the binary avro. Thus using the schema registry again
you can get the writer schema.

/Svante

On Thu, Aug 1, 2019, 15:30 Martin Mucha <al...@gmail.com> wrote:

> Hi,
>
> just one more question, not strictly related to the subject.
>
> Initially I though I'd be OK with using some initial version of schema in
> place of writer schema. That works, but all columns from schema older than
> this initial one would be just ignored. So I need to know EXACTLY the
> schema, which writer used. I know, that avro messages contains either full
> schema or at least it's ID. Can you point me to the documentation, where
> this is discussed? So in my deserializer I have byte[] as a input, from
> which I need to get the schema information first, in order to be able to
> deserialize the record. I really do not know how to do that, I'm pretty
> sure I never saw this anywhere, and I cannot find it anywhere. But in
> principle it must be possible, since reader need not necessarily have any
> control of which schema writer used.
>
> thanks a lot.
> M.
>
> út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <al...@gmail.com>
> napsal:
>
>> Thank you very much for in depth answer. I understand how it works now
>> better, will test it shortly.
>> Thank you for your time.
>>
>> Martin.
>>
>> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <ry...@skraba.com> napsal:
>>
>>> Hello!  It's the same issue in your example code as allegro, even with
>>> the SpecificDatumReader.
>>>
>>> This line: datumReader = new SpecificDatumReader<>(schema)
>>> should be: datumReader = new SpecificDatumReader<>(originalSchema,
>>> schema)
>>>
>>> In Avro, the original schema is commonly known as the writer schema
>>> (the instance that originally wrote the binary data).  Schema
>>> evolution applies when you are using the constructor of the
>>> SpecificDatumReader that takes *both* reader and writer schemas.
>>>
>>> As a concrete example, if your original schema was:
>>>
>>> {
>>>   "type": "record",
>>>   "name": "Simple",
>>>   "fields": [
>>>     {"name": "id", "type": "int"},
>>>     {"name": "name","type": "string"}
>>>   ]
>>> }
>>>
>>> And you added a field:
>>>
>>> {
>>>   "type": "record",
>>>   "name": "SimpleV2",
>>>   "fields": [
>>>     {"name": "id", "type": "int"},
>>>     {"name": "name", "type": "string"},
>>>     {"name": "description","type": ["null", "string"]}
>>>   ]
>>> }
>>>
>>> You could do the following safely, assuming that Simple and SimpleV2
>>> classes are generated from the avro-maven-plugin:
>>>
>>> @Test
>>> public void testSerializeDeserializeEvolution() throws IOException {
>>>   // Write a Simple v1 to bytes using your exact method.
>>>   byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false);
>>>
>>>   // Read as Simple v2, same as your method but with the writer and
>>> reader schema.
>>>   DatumReader<SimpleV2> datumReader =
>>>       new SpecificDatumReader<>(Simple.getClassSchema(),
>>> SimpleV2.getClassSchema());
>>>   Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes, null);
>>>   SimpleV2 v2 = datumReader.read(null, decoder);
>>>
>>>   assertThat(v2.getId(), is(1));
>>>   assertThat(v2.getName(), is(new Utf8("name1")));
>>>   assertThat(v2.getDescription(), nullValue());
>>> }
>>>
>>> This demonstrates with two different schemas and SpecificRecords in
>>> the same test, but the same principle applies if it's the same record
>>> that has evolved -- you need to know the original schema that wrote
>>> the data in order to apply the schema that you're now using for
>>> reading.
>>>
>>> I hope this clarifies what you are looking for!
>>>
>>> All my best, Ryan
>>>
>>>
>>>
>>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <al...@gmail.com> wrote:
>>> >
>>> > Thanks for answer.
>>> >
>>> > Actually I have exactly the same behavior with avro 1.9.0 and
>>> following deserializer in our other app, which uses strictly avro codebase,
>>> and failing with same exceptions. So lets leave "allegro" library and lots
>>> of other tools out of it in our discussion.
>>> > I can use whichever aproach. All I need is single way, where I can
>>> deserialize byte[] into class generated by avro-maven-plugin, and which
>>> will respect documentation regarding schema evolution. Currently we're
>>> using following deserializer and serializer, and these does not work when
>>> it comes to schema evolution. What is the correct way to serialize and
>>> deserializer avro data?
>>> >
>>> > I probably don't understand your mention about GenericRecord or
>>> GenericDatumReader. I tried to use GenericDatumReader in deserializer
>>> below, but then it seems I got back just GenericData$Record instance, which
>>> I can use then to access array of instances, which is not what I'm looking
>>> for(IIUC), since in that case I could have just use plain old JSON and
>>> deserialize it using jackson having no schema evolution problems at all. If
>>> that's correct, I'd rather stick to SpecificDatumReader, and somehow fix it
>>> if possible.
>>> >
>>> > What can be done? Or how schema evolution is intended to be used? I
>>> found a lots of question searching for this answer.
>>> >
>>> > thanks!
>>> > Martin.
>>> >
>>> > deserializer:
>>> >
>>> > public static <T extends SpecificRecordBase> T deserialize(Class<T>
>>> targetType,
>>> >                                                                byte[]
>>> data,
>>> >                                                                boolean
>>> useBinaryDecoder) {
>>> >         try {
>>> >             if (data == null) {
>>> >                 return null;
>>> >             }
>>> >
>>> >             log.trace("data='{}'",
>>> DatatypeConverter.printHexBinary(data));
>>> >
>>> >             Schema schema = targetType.newInstance().getSchema();
>>> >             DatumReader<GenericRecord> datumReader = new
>>> SpecificDatumReader<>(schema);
>>> >             Decoder decoder = useBinaryDecoder
>>> >                     ? DecoderFactory.get().binaryDecoder(data, null)
>>> >                     : DecoderFactory.get().jsonDecoder(schema, new
>>> String(data));
>>> >
>>> >             T result = targetType.cast(datumReader.read(null,
>>> decoder));
>>> >             log.trace("deserialized data='{}'", result);
>>> >             return result;
>>> >         } catch (Exception ex) {
>>> >             throw new SerializationException("Error deserializing
>>> data", ex);
>>> >         }
>>> >     }
>>> >
>>> > serializer:
>>> > public static <T extends SpecificRecordBase> byte[] serialize(T data,
>>> boolean useBinaryDecoder, boolean pretty) {
>>> >         try {
>>> >             if (data == null) {
>>> >                 return new byte[0];
>>> >             }
>>> >
>>> >             log.debug("data='{}'", data);
>>> >             Schema schema = data.getSchema();
>>> >             ByteArrayOutputStream byteArrayOutputStream = new
>>> ByteArrayOutputStream();
>>> >             Encoder binaryEncoder = useBinaryDecoder
>>> >                     ?
>>> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>>> >                     : EncoderFactory.get().jsonEncoder(schema,
>>> byteArrayOutputStream, pretty);
>>> >
>>> >             DatumWriter<GenericRecord> datumWriter = new
>>> GenericDatumWriter<>(schema);
>>> >             datumWriter.write(data, binaryEncoder);
>>> >
>>> >             binaryEncoder.flush();
>>> >             byteArrayOutputStream.close();
>>> >
>>> >             byte[] result = byteArrayOutputStream.toByteArray();
>>> >             log.debug("serialized data='{}'",
>>> DatatypeConverter.printHexBinary(result));
>>> >             return result;
>>> >         } catch (IOException ex) {
>>> >             throw new SerializationException(
>>> >                     "Can't serialize data='" + data, ex);
>>> >         }
>>> >     }
>>> >
>>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <ry...@skraba.com>
>>> napsal:
>>> >>
>>> >> Hello!  Schema evolution relies on both the writer and reader schemas
>>> >> being available.
>>> >>
>>> >> It looks like the allegro tool you are using is using the
>>> >> GenericDatumReader that assumes the reader and writer schema are the
>>> >> same:
>>> >>
>>> >>
>>> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83
>>> >>
>>> >> I do not believe that the "default" value is taken into account for
>>> >> data that is strictly missing from the binary input, just when a field
>>> >> is known to be in the reader schema but missing from the original
>>> >> writer.
>>> >>
>>> >> You may have more luck reading the GenericRecord with a
>>> >> GenericDatumReader with both schemas, and using the
>>> >> `convertToJson(record)`.
>>> >>
>>> >> I hope this is useful -- Ryan
>>> >>
>>> >>
>>> >>
>>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <al...@gmail.com>
>>> wrote:
>>> >> >
>>> >> > Hi,
>>> >> >
>>> >> > I've got some issues/misunderstanding of AVRO schema evolution.
>>> >> >
>>> >> > When reading through avro documentation, for example [1], I
>>> understood, that schema evolution is supported, and if I added column with
>>> specified default, it should be backwards compatible (and even forward when
>>> I remove it again). Sounds great, so I added column defined as:
>>> >> >
>>> >> >         {
>>> >> >           "name": "newColumn",
>>> >> >           "type": ["null","string"],
>>> >> >           "default": null,
>>> >> >           "doc": "something wrong"
>>> >> >         }
>>> >> >
>>> >> > and try to consumer some topic having this schema from beginning,
>>> it fails with message:
>>> >> >
>>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
>>> >> >     at
>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>>> >> >     at org.apache.avro.io
>>> .ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>>> >> >     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>> >> >     at org.apache.avro.io
>>> .ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>> >> >     at
>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>> >> > to give a little bit more information. Avro schema defines one top
>>> level type, having 2 fields. String describing type of message, and union
>>> of N types. All N-1, non-modified types can be read, but one updated with
>>> optional, default-having column cannot be read. I'm not sure if this design
>>> is strictly speaking correct, but that's not the point (feel free to
>>> criticise and recommend better approach!). I'm after schema evolution,
>>> which seems not to be working.
>>> >> >
>>> >> >
>>> >> > And if we alter type definition to:
>>> >> >
>>> >> > "type": "string",
>>> >> > "default": ""
>>> >> > it still does not work and generated error is:
>>> >> >
>>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data.
>>> Length is negative: -1
>>> >> >     at org.apache.avro.io
>>> .BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>>> >> >     at org.apache.avro.io
>>> .BinaryDecoder.readString(BinaryDecoder.java:263)
>>> >> >     at org.apache.avro.io
>>> .ResolvingDecoder.readString(ResolvingDecoder.java:201)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> >> >     at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>> >> >     at
>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>> >> >
>>> >> > Am I doing something wrong?
>>> >> >
>>> >> > thanks,
>>> >> > Martin.
>>> >> >
>>> >> > [1]
>>> https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules
>>>
>>