You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Colin Williams <co...@gmail.com> on 2020/09/18 06:27:34 UTC

Fwd: working with Avro records and schemas, programmatically

Hello,

I'm trying to understand working with Avro records and schemas,
programmatically. Then I was first trying to create a new schema and
records based on existing records, but with a different name /
namespace. It seems then I don't understand getFields() or
createRecord(...). Why can't I use the fields obtained from
getFields() in createRecord()?  How would I go about this properly?

// for an existing record already present
GenericRecord someRecord

// get a list of existing fields
List<Schema.Field> existingFields = someRecord.getSchema().getFields();

// schema for new record with existing fields
Schema updatedSchema = createRecord("UpdatedName",
"","avro.com.example.namespace" , false, existingFields);

^^ throws an exception ^^

/* Caused by: org.apache.avro.AvroRuntimeException: Field already
used: eventMetadata type:UNION pos:0
at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
at org.apache.avro.Schema.createRecord(Schema.java:217)
*/

final int length = fields.size();

GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
for (int i = 0; i < length; i++) {
    final Schema.Field field = existingFields.get(i);
    clonedRecord.put(i, someRecord.get(i));
}


Best Regards,

Colin Williams

Re: working with Avro records and schemas, programmatically

Posted by Mika Ristimaki <mi...@gmail.com>.
The exception means that you have inserted a value that has invalid type
according to the schema. The schema says that the value in that field
must be union of null and `some record` (the record name is removed from
your stacktrace so I don't know what record that is). It means that the
you can only insert values of type `null` and `some record`

For example:

The code below throws "org.apache.avro.UnresolvedUnionException: Not in
union ["null","string"] true" because I have inserted a value of type
`boolean` to a field that only accepts `null` or `string`

Schema schema = SchemaBuilder.record("test")
    .namespace("foo")
    .fields()
        .name("bar").type()
            .unionOf()
                .nullType()
                .and()
                .stringType()
            .endUnion()
            .noDefault()
    .endRecord();

GenericRecord record = new GenericData.Record(schema);
record.put("bar", true);

GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
Encoder binaryEncoder =
        EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
writer.write(record, binaryEncoder);

but 

GenericRecord record = new GenericData.Record(schema);
record.put("bar", "baz");

or

GenericRecord record = new GenericData.Record(schema);
record.put("bar", null);

would work fine.

-Mika
 
On Sep 22 2020, at 7:26 pm, Colin Williams
<co...@gmail.com> wrote:

> I am still curious regarding
> 
> Throws UnresolvedUnionException ```Caused by:
> org.apache.avro.UnresolvedUnionException: Not in union
> ["null",{"type":"record","name"...
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
> at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
> at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
> at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> ```
> 
> the copy exeption since I have since resolved creating the record with
> a Schema.
> 
> 
> Thanks,
> 
> Colin


Re: working with Avro records and schemas, programmatically

Posted by Colin Williams <co...@gmail.com>.
I am still curious regarding

Throws UnresolvedUnionException ```Caused by:
org.apache.avro.UnresolvedUnionException: Not in union
["null",{"type":"record","name"...
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
```

the copy exeption since I have since resolved creating the record with a Schema.


Thanks,

Colin

On Mon, Sep 21, 2020 at 10:46 PM Mika Ristimaki
<mi...@gmail.com> wrote:
>
> Oh you are running this in Spark. Even the newest version of Spark Core
> (3.0.1) has dependency to Avro 1.8.2. And the copy constructor that Ryan
> was talking about was added in Avro 1.9. Although I don't understand how this
>
> List<Schema.Field> clonedFields = existingFields.stream()
>      .map(f -> new Schema.Field(f, f.schema()))
>      .collect(Collectors.toList());
>
> throws NPE since it shouldn't even compile, because such constructor
> shouldn't exist.
>
> -Mika
>
> On Sep 22 2020, at 7:59 am, Colin Williams
> <co...@gmail.com> wrote:
>
> > *I tried your copy also and got the same UnresolvedUnionException*
> >
> > On Mon, Sep 21, 2020 at 9:57 PM Colin Williams
> > <co...@gmail.com> wrote:
> >>
> >> Hi, Mika
> >>
> >> Sorry I wasn't more clear in my previous emails. For the time since
> >> after my first email because I was getting a NPE when working with the
> >> Schema, I was able to fetch the Schema elsewhere and apply it with
> >>
> >> final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> >>
> >> Then I put aside that particular issue for the time being.
> >>
> >>
> >> Then I provided part of the related exception when attempting to write
> >> the copy. Unfortunately I can't currently provide the schema but maybe
> >> I will figure out a way to provide a reproduction eventually.
> >>
> >> E.G. Throws UnresolvedUnionException ```Caused by:
> >> org.apache.avro.UnresolvedUnionException: Not in union
> >> ["null",{"type":"record","name"...
> >> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
> >> at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
> >> at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
> >> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> >> at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
> >> at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> >> at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> >> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> >> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> >> ```
> >>
> >> I tried your copy also and got an NPE. I thought that it might be
> >> somewhat obvious from the exception message and details what is
> >> causing the exception.
> >>
> >> It might be that I can't do this within a Map operation in a
> >> distributed platform like Spark?
> >>
> >>
> >> Thanks and Best Regards,
> >>
> >> Colin
> >>
> >>
> >>
> >> On Mon, Sep 21, 2020 at 6:22 AM Mika Ristimaki
> >> <mi...@gmail.com> wrote:
> >> >
> >> > Hi Colin,
> >> >
> >> > You are omitting some details from your mail (such as the actual schema)
> >> > but I am suspecting in your first email you tried to do something
> >> like this
> >> >
> >> >  Schema SCHEMA = SchemaBuilder
> >> >                 .record("test")
> >> >                 .namespace("foo")
> >> >                 .fields()
> >> >                     .name("foo").type().stringType().noDefault()
> >> >                     .name("bar").type().optional().stringType()
> >> >                 .endRecord();
> >> >
> >> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
> >> > castRecord.put("foo", "foo value");
> >> > castRecord.put("bar", "bar value");
> >> >
> >> > Schema clonedSchema = Schema.createRecord("othertest",
> >> >                 "",
> >> >                 "bar",
> >> >                 false,
> >> >                 SCHEMA.getFields());
> >> >
> >> > And this didn't work because as Ryan explained schema fields cannot be
> >> > reused. So he suggested that the fields should be copied first.
> >> >
> >> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
> >> >                 .map(f -> new Schema.Field(f, f.schema()))
> >> >                 .collect(Collectors.toList());
> >> >
> >> > I tested it and it works fine. You should use it like this:
> >> >
> >> > Schema SCHEMA = SchemaBuilder
> >> >         .record("test")
> >> >         .namespace("foo")
> >> >         .fields()
> >> >             .name("foo").type().stringType().noDefault()
> >> >             .name("bar").type().optional().stringType()
> >> >         .endRecord();
> >> >
> >> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
> >> > castRecord.put("foo", "foo value");
> >> > castRecord.put("bar", "bar value");
> >> >
> >> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
> >> >         .map(f -> new Schema.Field(f, f.schema()))
> >> >         .collect(Collectors.toList());
> >> > Schema clonedSchema = Schema.createRecord("othertest",
> >> >         "",
> >> >         "bar",
> >> >         false,
> >> >         clonedFields);
> >> >
> >> > final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
> >> > for (int i = 0; i < clonedFields.size(); i++) {
> >> >     clonedRecord.put(i, castRecord.get(i));
> >> > }
> >> >
> >> > GenericDatumWriter<GenericRecord> writer = new
> >> >         GenericDatumWriter<GenericRecord>(SCHEMA);
> >> > ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
> >> > Encoder binaryEncoder =
> >> >
> >> EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> >> > writer.write(clonedRecord, binaryEncoder);
> >> >
> >> > Also for more robustness against reordered fields in clonedSchema, you
> >> > should probably do the data copying something like this:
> >> >
> >> > for (Schema.Field field : castRecord.getSchema().getFields()) {
> >> >     clonedRecord.put(field.name(), castRecord.get(field.name()));
> >> > }
> >> >
> >> >
> >> > On Sep 20 2020, at 7:33 am, Colin Williams
> >> > <co...@gmail.com> wrote:
> >> >
> >> > > I found a way to get my record schema so I'm no longer throwing the
> >> > > same exception. I am now able to put the record indexes for the cloned
> >> > > record.
> >> > >
> >> > > GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
> >> > > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> >> > > clonedRecord.getSchema().getFields();
> >> > > int length = SCHEMA.getFields().size();
> >> > > for (int i = 0; i < length; i++) {
> >> > >    clonedRecord.put(i, castRecord.get(i));
> >> > > }
> >> > >
> >> > > GenericDatumWriter<GenericRecord> writer = new
> >> > > GenericDatumWriter<GenericRecord>(SCHEMA);
> >> > > ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
> >> > > Encoder binaryEncoder =
> >> > > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> >> > > writer.write(clonedRecord, binaryEncoder);
> >> > >
> >> > > ^^ Throws UnresolvedUnionException ```Caused by:
> >> > > org.apache.avro.UnresolvedUnionException: Not in union
> >> > > ["null",{"type":"record","name"```
> >> > >
> >> > > The schema is identical between the records, with the exception
> >> of the
> >> > > name and namespace. Then I don't understand why I'm getting the
> >> > > exception. I can write with the castRecord and it's schema. Is
> >> this a
> >> > > byte alignment issue caused by the name and namespace? I didn't see
> >> > > them in the record indexes.
> >> > >
> >> > >
> >> > > On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
> >> > > <co...@gmail.com> wrote:
> >> > >>
> >> > >> I gave it a shot but from a MapFunction that supposed to be
> >> > >> serializable. I got an NPE.
> >> > >>
> >> > >> I assume I can't create a new Schema.Field from within that
> >> > >> MapFunction . I didn't seem to have trouble accessing the existing
> >> > >> schema fields.
> >> > >>
> >> > >> > List<Schema.Field> clonedFields = existingFields.stream()
> >> > >>         .map(f -> new Schema.Field(f, f.schema()))
> >> > >>         .collect(Collectors.toList());
> >> > >>
> >> > >> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
> >> > >> <co...@gmail.com> wrote:
> >> > >> >
> >> > >> > Hi Ryan, Thanks for your explanation. I am thinking now that the
> >> > >> > design of AVRO suggests that data and schemas are very planned things.
> >> > >> > That changes are planned through versioning and we don't like
> >> > >> > duplicated schemas (when the positioning makes sense).
> >> > >> >
> >> > >> > I have a round about way of learning. Sometimes I am working
> >> with data
> >> > >> > and I think it's convenient to transform my data
> >> programmatically and
> >> > >> > try to obtain a schema from that. Also I think that schemas
> >> can become
> >> > >> > cumbersome when many fields are involved in intricate patterns.
> >> > >> >
> >> > >> > I think maybe there are other forms maybe more well suited for that.
> >> > >> >
> >> > >> > Regarding your proposals 1,2 seem reasonable to me. But
> >> someone like
> >> > >> > myself might also not fully understand the design of AVRO.
> >> > >> > A better exception or some kind of lead for armchair
> >> programmers to
> >> > >> > better understand the exception. Thanks for mentioning the copy
> >> > >> > operation.
> >> > >> >
> >> > >> > Finally I do see something about aliases.
> >> > >> >
> >> > >> > Thanks,
> >> > >> >
> >> > >> > Colin
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <ry...@skraba.com> wrote:
> >> > >> > >
> >> > >> > > Hello Colin, you've hit one bit of fussiness with the Java
> >> SDK... you
> >> > >> > > can't reuse a Schema.Field object in two Records, because a field
> >> > >> > > knows its own position in the record[1].  If a field were to
> >> > >> belong to
> >> > >> > > two records at different positions, this method would have an
> >> > >> > > ambiguous response.
> >> > >> > >
> >> > >> > > As a workaround, since Avro 1.9, there's a copy constructor
> >> that you
> >> > >> > > can use to clone the field:
> >> > >> > >
> >> > >> > > List<Schema.Field> clonedFields = existingFields.stream()
> >> > >> > >         .map(f -> new Schema.Field(f, f.schema()))
> >> > >> > >         .collect(Collectors.toList());
> >> > >> > >
> >> > >> > > That being said, I don't see any reason we MUST throw an exception.
> >> > >> > > There's a couple of alternative strategies we could use in
> >> the Java
> >> > >> > > SDK:
> >> > >> > >
> >> > >> > > 1. If the position is the same in both records, allow the field
> >> > >> to be
> >> > >> > > reused (which enables cloning use cases).
> >> > >> > >
> >> > >> > > 2. Make a copy of the field to reuse internally if the
> >> position is
> >> > >> > > already set (probably OK, since it's supposed to be immutable).
> >> > >> > >
> >> > >> > > 3. Allow the field to be reused, only throw the exception
> >> only if
> >> > >> > > someone calls the position() method later.
> >> > >> > >
> >> > >> > > Any of those sound like a useful change for your use case?  Don't
> >> > >> > > hesitate to create a JIRA or contribution if you like!
> >> > >> > >
> >> > >> > > All my best, Ryan
> >> > >> > >
> >> > >> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
> >> > >> > > <co...@gmail.com> wrote:
> >> > >> > > >
> >> > >> > > > Hello,
> >> > >> > > >
> >> > >> > > > I'm trying to understand working with Avro records and schemas,
> >> > >> > > > programmatically. Then I was first trying to create a new
> >> > >> schema and
> >> > >> > > > records based on existing records, but with a different
> >> name /
> >> > >> > > > namespace. It seems then I don't understand getFields() or
> >> > >> > > > createRecord(...). Why can't I use the fields obtained from
> >> > >> > > > getFields() in createRecord()?  How would I go about this properly?
> >> > >> > > >
> >> > >> > > > // for an existing record already present
> >> > >> > > > GenericRecord someRecord
> >> > >> > > >
> >> > >> > > > // get a list of existing fields
> >> > >> > > > List<Schema.Field> existingFields = someRecord.getSchema().getFields();
> >> > >> > > >
> >> > >> > > > // schema for new record with existing fields
> >> > >> > > > Schema updatedSchema = createRecord("UpdatedName",
> >> > >> > > > "","avro.com.example.namespace" , false, existingFields);
> >> > >> > > >
> >> > >> > > > ^^ throws an exception ^^
> >> > >> > > >
> >> > >> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
> >> > >> > > > used: eventMetadata type:UNION pos:0
> >> > >> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> >> > >> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> >> > >> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
> >> > >> > > > */
> >> > >> > > >
> >> > >> > > > final int length = fields.size();
> >> > >> > > >
> >> > >> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
> >> > >> > > > for (int i = 0; i < length; i++) {
> >> > >> > > >     final Schema.Field field = existingFields.get(i);
> >> > >> > > >     clonedRecord.put(i, someRecord.get(i));
> >> > >> > > > }
> >> > >> > > >
> >> > >> > > >
> >> > >> > > > Best Regards,
> >> > >> > > >
> >> > >> > > > Colin Williams
> >> > >
> >

Re: working with Avro records and schemas, programmatically

Posted by Mika Ristimaki <mi...@gmail.com>.
Oh you are running this in Spark. Even the newest version of Spark Core
(3.0.1) has dependency to Avro 1.8.2. And the copy constructor that Ryan
was talking about was added in Avro 1.9. Although I don't understand how this

List<Schema.Field> clonedFields = existingFields.stream()
     .map(f -> new Schema.Field(f, f.schema()))
     .collect(Collectors.toList());

throws NPE since it shouldn't even compile, because such constructor
shouldn't exist.

-Mika 

On Sep 22 2020, at 7:59 am, Colin Williams
<co...@gmail.com> wrote:

> *I tried your copy also and got the same UnresolvedUnionException*
> 
> On Mon, Sep 21, 2020 at 9:57 PM Colin Williams
> <co...@gmail.com> wrote:
>> 
>> Hi, Mika
>> 
>> Sorry I wasn't more clear in my previous emails. For the time since
>> after my first email because I was getting a NPE when working with the
>> Schema, I was able to fetch the Schema elsewhere and apply it with
>> 
>> final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
>> 
>> Then I put aside that particular issue for the time being.
>> 
>> 
>> Then I provided part of the related exception when attempting to write
>> the copy. Unfortunately I can't currently provide the schema but maybe
>> I will figure out a way to provide a reproduction eventually.
>> 
>> E.G. Throws UnresolvedUnionException ```Caused by:
>> org.apache.avro.UnresolvedUnionException: Not in union
>> ["null",{"type":"record","name"...
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
>> at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
>> at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
>> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>> at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
>> at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
>> at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
>> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>> ```
>> 
>> I tried your copy also and got an NPE. I thought that it might be
>> somewhat obvious from the exception message and details what is
>> causing the exception.
>> 
>> It might be that I can't do this within a Map operation in a
>> distributed platform like Spark?
>> 
>> 
>> Thanks and Best Regards,
>> 
>> Colin
>> 
>> 
>> 
>> On Mon, Sep 21, 2020 at 6:22 AM Mika Ristimaki
>> <mi...@gmail.com> wrote:
>> >
>> > Hi Colin,
>> >
>> > You are omitting some details from your mail (such as the actual schema)
>> > but I am suspecting in your first email you tried to do something
>> like this
>> >
>> >  Schema SCHEMA = SchemaBuilder
>> >                 .record("test")
>> >                 .namespace("foo")
>> >                 .fields()
>> >                     .name("foo").type().stringType().noDefault()
>> >                     .name("bar").type().optional().stringType()
>> >                 .endRecord();
>> >
>> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
>> > castRecord.put("foo", "foo value");
>> > castRecord.put("bar", "bar value");
>> >
>> > Schema clonedSchema = Schema.createRecord("othertest",
>> >                 "",
>> >                 "bar",
>> >                 false,
>> >                 SCHEMA.getFields());
>> >
>> > And this didn't work because as Ryan explained schema fields cannot be
>> > reused. So he suggested that the fields should be copied first.
>> >
>> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
>> >                 .map(f -> new Schema.Field(f, f.schema()))
>> >                 .collect(Collectors.toList());
>> >
>> > I tested it and it works fine. You should use it like this:
>> >
>> > Schema SCHEMA = SchemaBuilder
>> >         .record("test")
>> >         .namespace("foo")
>> >         .fields()
>> >             .name("foo").type().stringType().noDefault()
>> >             .name("bar").type().optional().stringType()
>> >         .endRecord();
>> >
>> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
>> > castRecord.put("foo", "foo value");
>> > castRecord.put("bar", "bar value");
>> >
>> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
>> >         .map(f -> new Schema.Field(f, f.schema()))
>> >         .collect(Collectors.toList());
>> > Schema clonedSchema = Schema.createRecord("othertest",
>> >         "",
>> >         "bar",
>> >         false,
>> >         clonedFields);
>> >
>> > final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
>> > for (int i = 0; i < clonedFields.size(); i++) {
>> >     clonedRecord.put(i, castRecord.get(i));
>> > }
>> >
>> > GenericDatumWriter<GenericRecord> writer = new
>> >         GenericDatumWriter<GenericRecord>(SCHEMA);
>> > ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
>> > Encoder binaryEncoder =
>> >        
>> EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
>> > writer.write(clonedRecord, binaryEncoder);
>> >
>> > Also for more robustness against reordered fields in clonedSchema, you
>> > should probably do the data copying something like this:
>> >
>> > for (Schema.Field field : castRecord.getSchema().getFields()) {
>> >     clonedRecord.put(field.name(), castRecord.get(field.name()));
>> > }
>> >
>> >
>> > On Sep 20 2020, at 7:33 am, Colin Williams
>> > <co...@gmail.com> wrote:
>> >
>> > > I found a way to get my record schema so I'm no longer throwing the
>> > > same exception. I am now able to put the record indexes for the cloned
>> > > record.
>> > >
>> > > GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
>> > > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
>> > > clonedRecord.getSchema().getFields();
>> > > int length = SCHEMA.getFields().size();
>> > > for (int i = 0; i < length; i++) {
>> > >    clonedRecord.put(i, castRecord.get(i));
>> > > }
>> > >
>> > > GenericDatumWriter<GenericRecord> writer = new
>> > > GenericDatumWriter<GenericRecord>(SCHEMA);
>> > > ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
>> > > Encoder binaryEncoder =
>> > > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
>> > > writer.write(clonedRecord, binaryEncoder);
>> > >
>> > > ^^ Throws UnresolvedUnionException ```Caused by:
>> > > org.apache.avro.UnresolvedUnionException: Not in union
>> > > ["null",{"type":"record","name"```
>> > >
>> > > The schema is identical between the records, with the exception
>> of the
>> > > name and namespace. Then I don't understand why I'm getting the
>> > > exception. I can write with the castRecord and it's schema. Is
>> this a
>> > > byte alignment issue caused by the name and namespace? I didn't see
>> > > them in the record indexes.
>> > >
>> > >
>> > > On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
>> > > <co...@gmail.com> wrote:
>> > >>
>> > >> I gave it a shot but from a MapFunction that supposed to be
>> > >> serializable. I got an NPE.
>> > >>
>> > >> I assume I can't create a new Schema.Field from within that
>> > >> MapFunction . I didn't seem to have trouble accessing the existing
>> > >> schema fields.
>> > >>
>> > >> > List<Schema.Field> clonedFields = existingFields.stream()
>> > >>         .map(f -> new Schema.Field(f, f.schema()))
>> > >>         .collect(Collectors.toList());
>> > >>
>> > >> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
>> > >> <co...@gmail.com> wrote:
>> > >> >
>> > >> > Hi Ryan, Thanks for your explanation. I am thinking now that the
>> > >> > design of AVRO suggests that data and schemas are very planned things.
>> > >> > That changes are planned through versioning and we don't like
>> > >> > duplicated schemas (when the positioning makes sense).
>> > >> >
>> > >> > I have a round about way of learning. Sometimes I am working
>> with data
>> > >> > and I think it's convenient to transform my data
>> programmatically and
>> > >> > try to obtain a schema from that. Also I think that schemas
>> can become
>> > >> > cumbersome when many fields are involved in intricate patterns.
>> > >> >
>> > >> > I think maybe there are other forms maybe more well suited for that.
>> > >> >
>> > >> > Regarding your proposals 1,2 seem reasonable to me. But
>> someone like
>> > >> > myself might also not fully understand the design of AVRO.
>> > >> > A better exception or some kind of lead for armchair
>> programmers to
>> > >> > better understand the exception. Thanks for mentioning the copy
>> > >> > operation.
>> > >> >
>> > >> > Finally I do see something about aliases.
>> > >> >
>> > >> > Thanks,
>> > >> >
>> > >> > Colin
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <ry...@skraba.com> wrote:
>> > >> > >
>> > >> > > Hello Colin, you've hit one bit of fussiness with the Java
>> SDK... you
>> > >> > > can't reuse a Schema.Field object in two Records, because a field
>> > >> > > knows its own position in the record[1].  If a field were to
>> > >> belong to
>> > >> > > two records at different positions, this method would have an
>> > >> > > ambiguous response.
>> > >> > >
>> > >> > > As a workaround, since Avro 1.9, there's a copy constructor
>> that you
>> > >> > > can use to clone the field:
>> > >> > >
>> > >> > > List<Schema.Field> clonedFields = existingFields.stream()
>> > >> > >         .map(f -> new Schema.Field(f, f.schema()))
>> > >> > >         .collect(Collectors.toList());
>> > >> > >
>> > >> > > That being said, I don't see any reason we MUST throw an exception.
>> > >> > > There's a couple of alternative strategies we could use in
>> the Java
>> > >> > > SDK:
>> > >> > >
>> > >> > > 1. If the position is the same in both records, allow the field
>> > >> to be
>> > >> > > reused (which enables cloning use cases).
>> > >> > >
>> > >> > > 2. Make a copy of the field to reuse internally if the
>> position is
>> > >> > > already set (probably OK, since it's supposed to be immutable).
>> > >> > >
>> > >> > > 3. Allow the field to be reused, only throw the exception
>> only if
>> > >> > > someone calls the position() method later.
>> > >> > >
>> > >> > > Any of those sound like a useful change for your use case?  Don't
>> > >> > > hesitate to create a JIRA or contribution if you like!
>> > >> > >
>> > >> > > All my best, Ryan
>> > >> > >
>> > >> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
>> > >> > > <co...@gmail.com> wrote:
>> > >> > > >
>> > >> > > > Hello,
>> > >> > > >
>> > >> > > > I'm trying to understand working with Avro records and schemas,
>> > >> > > > programmatically. Then I was first trying to create a new
>> > >> schema and
>> > >> > > > records based on existing records, but with a different
>> name /
>> > >> > > > namespace. It seems then I don't understand getFields() or
>> > >> > > > createRecord(...). Why can't I use the fields obtained from
>> > >> > > > getFields() in createRecord()?  How would I go about this properly?
>> > >> > > >
>> > >> > > > // for an existing record already present
>> > >> > > > GenericRecord someRecord
>> > >> > > >
>> > >> > > > // get a list of existing fields
>> > >> > > > List<Schema.Field> existingFields = someRecord.getSchema().getFields();
>> > >> > > >
>> > >> > > > // schema for new record with existing fields
>> > >> > > > Schema updatedSchema = createRecord("UpdatedName",
>> > >> > > > "","avro.com.example.namespace" , false, existingFields);
>> > >> > > >
>> > >> > > > ^^ throws an exception ^^
>> > >> > > >
>> > >> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
>> > >> > > > used: eventMetadata type:UNION pos:0
>> > >> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
>> > >> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
>> > >> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
>> > >> > > > */
>> > >> > > >
>> > >> > > > final int length = fields.size();
>> > >> > > >
>> > >> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
>> > >> > > > for (int i = 0; i < length; i++) {
>> > >> > > >     final Schema.Field field = existingFields.get(i);
>> > >> > > >     clonedRecord.put(i, someRecord.get(i));
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > Best Regards,
>> > >> > > >
>> > >> > > > Colin Williams
>> > >
> 

Re: working with Avro records and schemas, programmatically

Posted by Colin Williams <co...@gmail.com>.
*I tried your copy also and got the same UnresolvedUnionException*

On Mon, Sep 21, 2020 at 9:57 PM Colin Williams
<co...@gmail.com> wrote:
>
> Hi, Mika
>
> Sorry I wasn't more clear in my previous emails. For the time since
> after my first email because I was getting a NPE when working with the
> Schema, I was able to fetch the Schema elsewhere and apply it with
>
> final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
>
> Then I put aside that particular issue for the time being.
>
>
> Then I provided part of the related exception when attempting to write
> the copy. Unfortunately I can't currently provide the schema but maybe
> I will figure out a way to provide a reproduction eventually.
>
> E.G. Throws UnresolvedUnionException ```Caused by:
> org.apache.avro.UnresolvedUnionException: Not in union
> ["null",{"type":"record","name"...
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
> at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
> at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
> at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> ```
>
> I tried your copy also and got an NPE. I thought that it might be
> somewhat obvious from the exception message and details what is
> causing the exception.
>
> It might be that I can't do this within a Map operation in a
> distributed platform like Spark?
>
>
> Thanks and Best Regards,
>
> Colin
>
>
>
> On Mon, Sep 21, 2020 at 6:22 AM Mika Ristimaki <mi...@gmail.com> wrote:
> >
> > Hi Colin,
> >
> > You are omitting some details from your mail (such as the actual schema)
> > but I am suspecting in your first email you tried to do something like this
> >
> >  Schema SCHEMA = SchemaBuilder
> >                 .record("test")
> >                 .namespace("foo")
> >                 .fields()
> >                     .name("foo").type().stringType().noDefault()
> >                     .name("bar").type().optional().stringType()
> >                 .endRecord();
> >
> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
> > castRecord.put("foo", "foo value");
> > castRecord.put("bar", "bar value");
> >
> > Schema clonedSchema = Schema.createRecord("othertest",
> >                 "",
> >                 "bar",
> >                 false,
> >                 SCHEMA.getFields());
> >
> > And this didn't work because as Ryan explained schema fields cannot be
> > reused. So he suggested that the fields should be copied first.
> >
> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
> >                 .map(f -> new Schema.Field(f, f.schema()))
> >                 .collect(Collectors.toList());
> >
> > I tested it and it works fine. You should use it like this:
> >
> > Schema SCHEMA = SchemaBuilder
> >         .record("test")
> >         .namespace("foo")
> >         .fields()
> >             .name("foo").type().stringType().noDefault()
> >             .name("bar").type().optional().stringType()
> >         .endRecord();
> >
> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
> > castRecord.put("foo", "foo value");
> > castRecord.put("bar", "bar value");
> >
> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
> >         .map(f -> new Schema.Field(f, f.schema()))
> >         .collect(Collectors.toList());
> > Schema clonedSchema = Schema.createRecord("othertest",
> >         "",
> >         "bar",
> >         false,
> >         clonedFields);
> >
> > final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
> > for (int i = 0; i < clonedFields.size(); i++) {
> >     clonedRecord.put(i, castRecord.get(i));
> > }
> >
> > GenericDatumWriter<GenericRecord> writer = new
> >         GenericDatumWriter<GenericRecord>(SCHEMA);
> > ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
> > Encoder binaryEncoder =
> >         EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> > writer.write(clonedRecord, binaryEncoder);
> >
> > Also for more robustness against reordered fields in clonedSchema, you
> > should probably do the data copying something like this:
> >
> > for (Schema.Field field : castRecord.getSchema().getFields()) {
> >     clonedRecord.put(field.name(), castRecord.get(field.name()));
> > }
> >
> >
> > On Sep 20 2020, at 7:33 am, Colin Williams
> > <co...@gmail.com> wrote:
> >
> > > I found a way to get my record schema so I'm no longer throwing the
> > > same exception. I am now able to put the record indexes for the cloned
> > > record.
> > >
> > > GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
> > > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> > > clonedRecord.getSchema().getFields();
> > > int length = SCHEMA.getFields().size();
> > > for (int i = 0; i < length; i++) {
> > >    clonedRecord.put(i, castRecord.get(i));
> > > }
> > >
> > > GenericDatumWriter<GenericRecord> writer = new
> > > GenericDatumWriter<GenericRecord>(SCHEMA);
> > > ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
> > > Encoder binaryEncoder =
> > > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> > > writer.write(clonedRecord, binaryEncoder);
> > >
> > > ^^ Throws UnresolvedUnionException ```Caused by:
> > > org.apache.avro.UnresolvedUnionException: Not in union
> > > ["null",{"type":"record","name"```
> > >
> > > The schema is identical between the records, with the exception of the
> > > name and namespace. Then I don't understand why I'm getting the
> > > exception. I can write with the castRecord and it's schema. Is this a
> > > byte alignment issue caused by the name and namespace? I didn't see
> > > them in the record indexes.
> > >
> > >
> > > On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
> > > <co...@gmail.com> wrote:
> > >>
> > >> I gave it a shot but from a MapFunction that supposed to be
> > >> serializable. I got an NPE.
> > >>
> > >> I assume I can't create a new Schema.Field from within that
> > >> MapFunction . I didn't seem to have trouble accessing the existing
> > >> schema fields.
> > >>
> > >> > List<Schema.Field> clonedFields = existingFields.stream()
> > >>         .map(f -> new Schema.Field(f, f.schema()))
> > >>         .collect(Collectors.toList());
> > >>
> > >> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
> > >> <co...@gmail.com> wrote:
> > >> >
> > >> > Hi Ryan, Thanks for your explanation. I am thinking now that the
> > >> > design of AVRO suggests that data and schemas are very planned things.
> > >> > That changes are planned through versioning and we don't like
> > >> > duplicated schemas (when the positioning makes sense).
> > >> >
> > >> > I have a round about way of learning. Sometimes I am working with data
> > >> > and I think it's convenient to transform my data programmatically and
> > >> > try to obtain a schema from that. Also I think that schemas can become
> > >> > cumbersome when many fields are involved in intricate patterns.
> > >> >
> > >> > I think maybe there are other forms maybe more well suited for that.
> > >> >
> > >> > Regarding your proposals 1,2 seem reasonable to me. But someone like
> > >> > myself might also not fully understand the design of AVRO.
> > >> > A better exception or some kind of lead for armchair programmers to
> > >> > better understand the exception. Thanks for mentioning the copy
> > >> > operation.
> > >> >
> > >> > Finally I do see something about aliases.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Colin
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <ry...@skraba.com> wrote:
> > >> > >
> > >> > > Hello Colin, you've hit one bit of fussiness with the Java SDK... you
> > >> > > can't reuse a Schema.Field object in two Records, because a field
> > >> > > knows its own position in the record[1].  If a field were to
> > >> belong to
> > >> > > two records at different positions, this method would have an
> > >> > > ambiguous response.
> > >> > >
> > >> > > As a workaround, since Avro 1.9, there's a copy constructor that you
> > >> > > can use to clone the field:
> > >> > >
> > >> > > List<Schema.Field> clonedFields = existingFields.stream()
> > >> > >         .map(f -> new Schema.Field(f, f.schema()))
> > >> > >         .collect(Collectors.toList());
> > >> > >
> > >> > > That being said, I don't see any reason we MUST throw an exception.
> > >> > > There's a couple of alternative strategies we could use in the Java
> > >> > > SDK:
> > >> > >
> > >> > > 1. If the position is the same in both records, allow the field
> > >> to be
> > >> > > reused (which enables cloning use cases).
> > >> > >
> > >> > > 2. Make a copy of the field to reuse internally if the position is
> > >> > > already set (probably OK, since it's supposed to be immutable).
> > >> > >
> > >> > > 3. Allow the field to be reused, only throw the exception only if
> > >> > > someone calls the position() method later.
> > >> > >
> > >> > > Any of those sound like a useful change for your use case?  Don't
> > >> > > hesitate to create a JIRA or contribution if you like!
> > >> > >
> > >> > > All my best, Ryan
> > >> > >
> > >> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
> > >> > > <co...@gmail.com> wrote:
> > >> > > >
> > >> > > > Hello,
> > >> > > >
> > >> > > > I'm trying to understand working with Avro records and schemas,
> > >> > > > programmatically. Then I was first trying to create a new
> > >> schema and
> > >> > > > records based on existing records, but with a different name /
> > >> > > > namespace. It seems then I don't understand getFields() or
> > >> > > > createRecord(...). Why can't I use the fields obtained from
> > >> > > > getFields() in createRecord()?  How would I go about this properly?
> > >> > > >
> > >> > > > // for an existing record already present
> > >> > > > GenericRecord someRecord
> > >> > > >
> > >> > > > // get a list of existing fields
> > >> > > > List<Schema.Field> existingFields = someRecord.getSchema().getFields();
> > >> > > >
> > >> > > > // schema for new record with existing fields
> > >> > > > Schema updatedSchema = createRecord("UpdatedName",
> > >> > > > "","avro.com.example.namespace" , false, existingFields);
> > >> > > >
> > >> > > > ^^ throws an exception ^^
> > >> > > >
> > >> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
> > >> > > > used: eventMetadata type:UNION pos:0
> > >> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> > >> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> > >> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
> > >> > > > */
> > >> > > >
> > >> > > > final int length = fields.size();
> > >> > > >
> > >> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
> > >> > > > for (int i = 0; i < length; i++) {
> > >> > > >     final Schema.Field field = existingFields.get(i);
> > >> > > >     clonedRecord.put(i, someRecord.get(i));
> > >> > > > }
> > >> > > >
> > >> > > >
> > >> > > > Best Regards,
> > >> > > >
> > >> > > > Colin Williams
> > >

Re: working with Avro records and schemas, programmatically

Posted by Colin Williams <co...@gmail.com>.
Hi, Mika

Sorry I wasn't more clear in my previous emails. For the time since
after my first email because I was getting a NPE when working with the
Schema, I was able to fetch the Schema elsewhere and apply it with

final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);

Then I put aside that particular issue for the time being.


Then I provided part of the related exception when attempting to write
the copy. Unfortunately I can't currently provide the schema but maybe
I will figure out a way to provide a reproduction eventually.

E.G. Throws UnresolvedUnionException ```Caused by:
org.apache.avro.UnresolvedUnionException: Not in union
["null",{"type":"record","name"...
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
```

I tried your copy also and got an NPE. I thought that it might be
somewhat obvious from the exception message and details what is
causing the exception.

It might be that I can't do this within a Map operation in a
distributed platform like Spark?


Thanks and Best Regards,

Colin



On Mon, Sep 21, 2020 at 6:22 AM Mika Ristimaki <mi...@gmail.com> wrote:
>
> Hi Colin,
>
> You are omitting some details from your mail (such as the actual schema)
> but I am suspecting in your first email you tried to do something like this
>
>  Schema SCHEMA = SchemaBuilder
>                 .record("test")
>                 .namespace("foo")
>                 .fields()
>                     .name("foo").type().stringType().noDefault()
>                     .name("bar").type().optional().stringType()
>                 .endRecord();
>
> GenericRecord castRecord = new GenericData.Record(SCHEMA);
> castRecord.put("foo", "foo value");
> castRecord.put("bar", "bar value");
>
> Schema clonedSchema = Schema.createRecord("othertest",
>                 "",
>                 "bar",
>                 false,
>                 SCHEMA.getFields());
>
> And this didn't work because as Ryan explained schema fields cannot be
> reused. So he suggested that the fields should be copied first.
>
> List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
>                 .map(f -> new Schema.Field(f, f.schema()))
>                 .collect(Collectors.toList());
>
> I tested it and it works fine. You should use it like this:
>
> Schema SCHEMA = SchemaBuilder
>         .record("test")
>         .namespace("foo")
>         .fields()
>             .name("foo").type().stringType().noDefault()
>             .name("bar").type().optional().stringType()
>         .endRecord();
>
> GenericRecord castRecord = new GenericData.Record(SCHEMA);
> castRecord.put("foo", "foo value");
> castRecord.put("bar", "bar value");
>
> List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
>         .map(f -> new Schema.Field(f, f.schema()))
>         .collect(Collectors.toList());
> Schema clonedSchema = Schema.createRecord("othertest",
>         "",
>         "bar",
>         false,
>         clonedFields);
>
> final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
> for (int i = 0; i < clonedFields.size(); i++) {
>     clonedRecord.put(i, castRecord.get(i));
> }
>
> GenericDatumWriter<GenericRecord> writer = new
>         GenericDatumWriter<GenericRecord>(SCHEMA);
> ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
> Encoder binaryEncoder =
>         EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> writer.write(clonedRecord, binaryEncoder);
>
> Also for more robustness against reordered fields in clonedSchema, you
> should probably do the data copying something like this:
>
> for (Schema.Field field : castRecord.getSchema().getFields()) {
>     clonedRecord.put(field.name(), castRecord.get(field.name()));
> }
>
>
> On Sep 20 2020, at 7:33 am, Colin Williams
> <co...@gmail.com> wrote:
>
> > I found a way to get my record schema so I'm no longer throwing the
> > same exception. I am now able to put the record indexes for the cloned
> > record.
> >
> > GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
> > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> > clonedRecord.getSchema().getFields();
> > int length = SCHEMA.getFields().size();
> > for (int i = 0; i < length; i++) {
> >    clonedRecord.put(i, castRecord.get(i));
> > }
> >
> > GenericDatumWriter<GenericRecord> writer = new
> > GenericDatumWriter<GenericRecord>(SCHEMA);
> > ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
> > Encoder binaryEncoder =
> > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> > writer.write(clonedRecord, binaryEncoder);
> >
> > ^^ Throws UnresolvedUnionException ```Caused by:
> > org.apache.avro.UnresolvedUnionException: Not in union
> > ["null",{"type":"record","name"```
> >
> > The schema is identical between the records, with the exception of the
> > name and namespace. Then I don't understand why I'm getting the
> > exception. I can write with the castRecord and it's schema. Is this a
> > byte alignment issue caused by the name and namespace? I didn't see
> > them in the record indexes.
> >
> >
> > On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
> > <co...@gmail.com> wrote:
> >>
> >> I gave it a shot but from a MapFunction that supposed to be
> >> serializable. I got an NPE.
> >>
> >> I assume I can't create a new Schema.Field from within that
> >> MapFunction . I didn't seem to have trouble accessing the existing
> >> schema fields.
> >>
> >> > List<Schema.Field> clonedFields = existingFields.stream()
> >>         .map(f -> new Schema.Field(f, f.schema()))
> >>         .collect(Collectors.toList());
> >>
> >> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
> >> <co...@gmail.com> wrote:
> >> >
> >> > Hi Ryan, Thanks for your explanation. I am thinking now that the
> >> > design of AVRO suggests that data and schemas are very planned things.
> >> > That changes are planned through versioning and we don't like
> >> > duplicated schemas (when the positioning makes sense).
> >> >
> >> > I have a round about way of learning. Sometimes I am working with data
> >> > and I think it's convenient to transform my data programmatically and
> >> > try to obtain a schema from that. Also I think that schemas can become
> >> > cumbersome when many fields are involved in intricate patterns.
> >> >
> >> > I think maybe there are other forms maybe more well suited for that.
> >> >
> >> > Regarding your proposals 1,2 seem reasonable to me. But someone like
> >> > myself might also not fully understand the design of AVRO.
> >> > A better exception or some kind of lead for armchair programmers to
> >> > better understand the exception. Thanks for mentioning the copy
> >> > operation.
> >> >
> >> > Finally I do see something about aliases.
> >> >
> >> > Thanks,
> >> >
> >> > Colin
> >> >
> >> >
> >> >
> >> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <ry...@skraba.com> wrote:
> >> > >
> >> > > Hello Colin, you've hit one bit of fussiness with the Java SDK... you
> >> > > can't reuse a Schema.Field object in two Records, because a field
> >> > > knows its own position in the record[1].  If a field were to
> >> belong to
> >> > > two records at different positions, this method would have an
> >> > > ambiguous response.
> >> > >
> >> > > As a workaround, since Avro 1.9, there's a copy constructor that you
> >> > > can use to clone the field:
> >> > >
> >> > > List<Schema.Field> clonedFields = existingFields.stream()
> >> > >         .map(f -> new Schema.Field(f, f.schema()))
> >> > >         .collect(Collectors.toList());
> >> > >
> >> > > That being said, I don't see any reason we MUST throw an exception.
> >> > > There's a couple of alternative strategies we could use in the Java
> >> > > SDK:
> >> > >
> >> > > 1. If the position is the same in both records, allow the field
> >> to be
> >> > > reused (which enables cloning use cases).
> >> > >
> >> > > 2. Make a copy of the field to reuse internally if the position is
> >> > > already set (probably OK, since it's supposed to be immutable).
> >> > >
> >> > > 3. Allow the field to be reused, only throw the exception only if
> >> > > someone calls the position() method later.
> >> > >
> >> > > Any of those sound like a useful change for your use case?  Don't
> >> > > hesitate to create a JIRA or contribution if you like!
> >> > >
> >> > > All my best, Ryan
> >> > >
> >> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
> >> > > <co...@gmail.com> wrote:
> >> > > >
> >> > > > Hello,
> >> > > >
> >> > > > I'm trying to understand working with Avro records and schemas,
> >> > > > programmatically. Then I was first trying to create a new
> >> schema and
> >> > > > records based on existing records, but with a different name /
> >> > > > namespace. It seems then I don't understand getFields() or
> >> > > > createRecord(...). Why can't I use the fields obtained from
> >> > > > getFields() in createRecord()?  How would I go about this properly?
> >> > > >
> >> > > > // for an existing record already present
> >> > > > GenericRecord someRecord
> >> > > >
> >> > > > // get a list of existing fields
> >> > > > List<Schema.Field> existingFields = someRecord.getSchema().getFields();
> >> > > >
> >> > > > // schema for new record with existing fields
> >> > > > Schema updatedSchema = createRecord("UpdatedName",
> >> > > > "","avro.com.example.namespace" , false, existingFields);
> >> > > >
> >> > > > ^^ throws an exception ^^
> >> > > >
> >> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
> >> > > > used: eventMetadata type:UNION pos:0
> >> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> >> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> >> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
> >> > > > */
> >> > > >
> >> > > > final int length = fields.size();
> >> > > >
> >> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
> >> > > > for (int i = 0; i < length; i++) {
> >> > > >     final Schema.Field field = existingFields.get(i);
> >> > > >     clonedRecord.put(i, someRecord.get(i));
> >> > > > }
> >> > > >
> >> > > >
> >> > > > Best Regards,
> >> > > >
> >> > > > Colin Williams
> >

Re: working with Avro records and schemas, programmatically

Posted by Mika Ristimaki <mi...@gmail.com>.
Hi Colin,

You are omitting some details from your mail (such as the actual schema)
but I am suspecting in your first email you tried to do something like this

 Schema SCHEMA = SchemaBuilder
                .record("test")
                .namespace("foo")
                .fields()
                    .name("foo").type().stringType().noDefault()
                    .name("bar").type().optional().stringType()
                .endRecord();

GenericRecord castRecord = new GenericData.Record(SCHEMA);
castRecord.put("foo", "foo value");
castRecord.put("bar", "bar value");

Schema clonedSchema = Schema.createRecord("othertest",
                "",
                "bar",
                false,
                SCHEMA.getFields());

And this didn't work because as Ryan explained schema fields cannot be
reused. So he suggested that the fields should be copied first.

List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
                .map(f -> new Schema.Field(f, f.schema()))
                .collect(Collectors.toList());

I tested it and it works fine. You should use it like this:

Schema SCHEMA = SchemaBuilder
        .record("test")
        .namespace("foo")
        .fields()
            .name("foo").type().stringType().noDefault()
            .name("bar").type().optional().stringType()
        .endRecord();

GenericRecord castRecord = new GenericData.Record(SCHEMA);
castRecord.put("foo", "foo value");
castRecord.put("bar", "bar value");

List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
        .map(f -> new Schema.Field(f, f.schema()))
        .collect(Collectors.toList());
Schema clonedSchema = Schema.createRecord("othertest",
        "",
        "bar",
        false,
        clonedFields);

final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
for (int i = 0; i < clonedFields.size(); i++) {
    clonedRecord.put(i, castRecord.get(i));
}

GenericDatumWriter<GenericRecord> writer = new
        GenericDatumWriter<GenericRecord>(SCHEMA);
ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
Encoder binaryEncoder =
        EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
writer.write(clonedRecord, binaryEncoder); 

Also for more robustness against reordered fields in clonedSchema, you
should probably do the data copying something like this:

for (Schema.Field field : castRecord.getSchema().getFields()) {
    clonedRecord.put(field.name(), castRecord.get(field.name()));
}

 
On Sep 20 2020, at 7:33 am, Colin Williams
<co...@gmail.com> wrote:

> I found a way to get my record schema so I'm no longer throwing the
> same exception. I am now able to put the record indexes for the cloned
> record.
> 
> GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
> final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> clonedRecord.getSchema().getFields();
> int length = SCHEMA.getFields().size();
> for (int i = 0; i < length; i++) {
>    clonedRecord.put(i, castRecord.get(i));
> }
> 
> GenericDatumWriter<GenericRecord> writer = new
> GenericDatumWriter<GenericRecord>(SCHEMA);
> ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
> Encoder binaryEncoder =
> EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> writer.write(clonedRecord, binaryEncoder);
> 
> ^^ Throws UnresolvedUnionException ```Caused by:
> org.apache.avro.UnresolvedUnionException: Not in union
> ["null",{"type":"record","name"```
> 
> The schema is identical between the records, with the exception of the
> name and namespace. Then I don't understand why I'm getting the
> exception. I can write with the castRecord and it's schema. Is this a
> byte alignment issue caused by the name and namespace? I didn't see
> them in the record indexes.
> 
> 
> On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
> <co...@gmail.com> wrote:
>> 
>> I gave it a shot but from a MapFunction that supposed to be
>> serializable. I got an NPE.
>> 
>> I assume I can't create a new Schema.Field from within that
>> MapFunction . I didn't seem to have trouble accessing the existing
>> schema fields.
>> 
>> > List<Schema.Field> clonedFields = existingFields.stream()
>>         .map(f -> new Schema.Field(f, f.schema()))
>>         .collect(Collectors.toList());
>> 
>> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
>> <co...@gmail.com> wrote:
>> >
>> > Hi Ryan, Thanks for your explanation. I am thinking now that the
>> > design of AVRO suggests that data and schemas are very planned things.
>> > That changes are planned through versioning and we don't like
>> > duplicated schemas (when the positioning makes sense).
>> >
>> > I have a round about way of learning. Sometimes I am working with data
>> > and I think it's convenient to transform my data programmatically and
>> > try to obtain a schema from that. Also I think that schemas can become
>> > cumbersome when many fields are involved in intricate patterns.
>> >
>> > I think maybe there are other forms maybe more well suited for that.
>> >
>> > Regarding your proposals 1,2 seem reasonable to me. But someone like
>> > myself might also not fully understand the design of AVRO.
>> > A better exception or some kind of lead for armchair programmers to
>> > better understand the exception. Thanks for mentioning the copy
>> > operation.
>> >
>> > Finally I do see something about aliases.
>> >
>> > Thanks,
>> >
>> > Colin
>> >
>> >
>> >
>> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <ry...@skraba.com> wrote:
>> > >
>> > > Hello Colin, you've hit one bit of fussiness with the Java SDK... you
>> > > can't reuse a Schema.Field object in two Records, because a field
>> > > knows its own position in the record[1].  If a field were to
>> belong to
>> > > two records at different positions, this method would have an
>> > > ambiguous response.
>> > >
>> > > As a workaround, since Avro 1.9, there's a copy constructor that you
>> > > can use to clone the field:
>> > >
>> > > List<Schema.Field> clonedFields = existingFields.stream()
>> > >         .map(f -> new Schema.Field(f, f.schema()))
>> > >         .collect(Collectors.toList());
>> > >
>> > > That being said, I don't see any reason we MUST throw an exception.
>> > > There's a couple of alternative strategies we could use in the Java
>> > > SDK:
>> > >
>> > > 1. If the position is the same in both records, allow the field
>> to be
>> > > reused (which enables cloning use cases).
>> > >
>> > > 2. Make a copy of the field to reuse internally if the position is
>> > > already set (probably OK, since it's supposed to be immutable).
>> > >
>> > > 3. Allow the field to be reused, only throw the exception only if
>> > > someone calls the position() method later.
>> > >
>> > > Any of those sound like a useful change for your use case?  Don't
>> > > hesitate to create a JIRA or contribution if you like!
>> > >
>> > > All my best, Ryan
>> > >
>> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
>> > > <co...@gmail.com> wrote:
>> > > >
>> > > > Hello,
>> > > >
>> > > > I'm trying to understand working with Avro records and schemas,
>> > > > programmatically. Then I was first trying to create a new
>> schema and
>> > > > records based on existing records, but with a different name /
>> > > > namespace. It seems then I don't understand getFields() or
>> > > > createRecord(...). Why can't I use the fields obtained from
>> > > > getFields() in createRecord()?  How would I go about this properly?
>> > > >
>> > > > // for an existing record already present
>> > > > GenericRecord someRecord
>> > > >
>> > > > // get a list of existing fields
>> > > > List<Schema.Field> existingFields = someRecord.getSchema().getFields();
>> > > >
>> > > > // schema for new record with existing fields
>> > > > Schema updatedSchema = createRecord("UpdatedName",
>> > > > "","avro.com.example.namespace" , false, existingFields);
>> > > >
>> > > > ^^ throws an exception ^^
>> > > >
>> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
>> > > > used: eventMetadata type:UNION pos:0
>> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
>> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
>> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
>> > > > */
>> > > >
>> > > > final int length = fields.size();
>> > > >
>> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
>> > > > for (int i = 0; i < length; i++) {
>> > > >     final Schema.Field field = existingFields.get(i);
>> > > >     clonedRecord.put(i, someRecord.get(i));
>> > > > }
>> > > >
>> > > >
>> > > > Best Regards,
>> > > >
>> > > > Colin Williams
> 

Re: working with Avro records and schemas, programmatically

Posted by Colin Williams <co...@gmail.com>.
I found a way to get my record schema so I'm no longer throwing the
same exception. I am now able to put the record indexes for the cloned
record.

GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
clonedRecord.getSchema().getFields();
int length = SCHEMA.getFields().size();
for (int i = 0; i < length; i++) {
    clonedRecord.put(i, castRecord.get(i));
}

GenericDatumWriter<GenericRecord> writer = new
GenericDatumWriter<GenericRecord>(SCHEMA);
ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
Encoder binaryEncoder =
EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
writer.write(clonedRecord, binaryEncoder);

^^ Throws UnresolvedUnionException ```Caused by:
org.apache.avro.UnresolvedUnionException: Not in union
["null",{"type":"record","name"```

The schema is identical between the records, with the exception of the
name and namespace. Then I don't understand why I'm getting the
exception. I can write with the castRecord and it's schema. Is this a
byte alignment issue caused by the name and namespace? I didn't see
them in the record indexes.


On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
<co...@gmail.com> wrote:
>
> I gave it a shot but from a MapFunction that supposed to be
> serializable. I got an NPE.
>
> I assume I can't create a new Schema.Field from within that
> MapFunction . I didn't seem to have trouble accessing the existing
> schema fields.
>
> > List<Schema.Field> clonedFields = existingFields.stream()
>         .map(f -> new Schema.Field(f, f.schema()))
>         .collect(Collectors.toList());
>
> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
> <co...@gmail.com> wrote:
> >
> > Hi Ryan, Thanks for your explanation. I am thinking now that the
> > design of AVRO suggests that data and schemas are very planned things.
> > That changes are planned through versioning and we don't like
> > duplicated schemas (when the positioning makes sense).
> >
> > I have a round about way of learning. Sometimes I am working with data
> > and I think it's convenient to transform my data programmatically and
> > try to obtain a schema from that. Also I think that schemas can become
> > cumbersome when many fields are involved in intricate patterns.
> >
> > I think maybe there are other forms maybe more well suited for that.
> >
> > Regarding your proposals 1,2 seem reasonable to me. But someone like
> > myself might also not fully understand the design of AVRO.
> > A better exception or some kind of lead for armchair programmers to
> > better understand the exception. Thanks for mentioning the copy
> > operation.
> >
> > Finally I do see something about aliases.
> >
> > Thanks,
> >
> > Colin
> >
> >
> >
> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <ry...@skraba.com> wrote:
> > >
> > > Hello Colin, you've hit one bit of fussiness with the Java SDK... you
> > > can't reuse a Schema.Field object in two Records, because a field
> > > knows its own position in the record[1].  If a field were to belong to
> > > two records at different positions, this method would have an
> > > ambiguous response.
> > >
> > > As a workaround, since Avro 1.9, there's a copy constructor that you
> > > can use to clone the field:
> > >
> > > List<Schema.Field> clonedFields = existingFields.stream()
> > >         .map(f -> new Schema.Field(f, f.schema()))
> > >         .collect(Collectors.toList());
> > >
> > > That being said, I don't see any reason we MUST throw an exception.
> > > There's a couple of alternative strategies we could use in the Java
> > > SDK:
> > >
> > > 1. If the position is the same in both records, allow the field to be
> > > reused (which enables cloning use cases).
> > >
> > > 2. Make a copy of the field to reuse internally if the position is
> > > already set (probably OK, since it's supposed to be immutable).
> > >
> > > 3. Allow the field to be reused, only throw the exception only if
> > > someone calls the position() method later.
> > >
> > > Any of those sound like a useful change for your use case?  Don't
> > > hesitate to create a JIRA or contribution if you like!
> > >
> > > All my best, Ryan
> > >
> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
> > > <co...@gmail.com> wrote:
> > > >
> > > > Hello,
> > > >
> > > > I'm trying to understand working with Avro records and schemas,
> > > > programmatically. Then I was first trying to create a new schema and
> > > > records based on existing records, but with a different name /
> > > > namespace. It seems then I don't understand getFields() or
> > > > createRecord(...). Why can't I use the fields obtained from
> > > > getFields() in createRecord()?  How would I go about this properly?
> > > >
> > > > // for an existing record already present
> > > > GenericRecord someRecord
> > > >
> > > > // get a list of existing fields
> > > > List<Schema.Field> existingFields = someRecord.getSchema().getFields();
> > > >
> > > > // schema for new record with existing fields
> > > > Schema updatedSchema = createRecord("UpdatedName",
> > > > "","avro.com.example.namespace" , false, existingFields);
> > > >
> > > > ^^ throws an exception ^^
> > > >
> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
> > > > used: eventMetadata type:UNION pos:0
> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
> > > > */
> > > >
> > > > final int length = fields.size();
> > > >
> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
> > > > for (int i = 0; i < length; i++) {
> > > >     final Schema.Field field = existingFields.get(i);
> > > >     clonedRecord.put(i, someRecord.get(i));
> > > > }
> > > >
> > > >
> > > > Best Regards,
> > > >
> > > > Colin Williams

Re: working with Avro records and schemas, programmatically

Posted by Colin Williams <co...@gmail.com>.
I gave it a shot but from a MapFunction that supposed to be
serializable. I got an NPE.

I assume I can't create a new Schema.Field from within that
MapFunction . I didn't seem to have trouble accessing the existing
schema fields.

> List<Schema.Field> clonedFields = existingFields.stream()
        .map(f -> new Schema.Field(f, f.schema()))
        .collect(Collectors.toList());

On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
<co...@gmail.com> wrote:
>
> Hi Ryan, Thanks for your explanation. I am thinking now that the
> design of AVRO suggests that data and schemas are very planned things.
> That changes are planned through versioning and we don't like
> duplicated schemas (when the positioning makes sense).
>
> I have a round about way of learning. Sometimes I am working with data
> and I think it's convenient to transform my data programmatically and
> try to obtain a schema from that. Also I think that schemas can become
> cumbersome when many fields are involved in intricate patterns.
>
> I think maybe there are other forms maybe more well suited for that.
>
> Regarding your proposals 1,2 seem reasonable to me. But someone like
> myself might also not fully understand the design of AVRO.
> A better exception or some kind of lead for armchair programmers to
> better understand the exception. Thanks for mentioning the copy
> operation.
>
> Finally I do see something about aliases.
>
> Thanks,
>
> Colin
>
>
>
> On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <ry...@skraba.com> wrote:
> >
> > Hello Colin, you've hit one bit of fussiness with the Java SDK... you
> > can't reuse a Schema.Field object in two Records, because a field
> > knows its own position in the record[1].  If a field were to belong to
> > two records at different positions, this method would have an
> > ambiguous response.
> >
> > As a workaround, since Avro 1.9, there's a copy constructor that you
> > can use to clone the field:
> >
> > List<Schema.Field> clonedFields = existingFields.stream()
> >         .map(f -> new Schema.Field(f, f.schema()))
> >         .collect(Collectors.toList());
> >
> > That being said, I don't see any reason we MUST throw an exception.
> > There's a couple of alternative strategies we could use in the Java
> > SDK:
> >
> > 1. If the position is the same in both records, allow the field to be
> > reused (which enables cloning use cases).
> >
> > 2. Make a copy of the field to reuse internally if the position is
> > already set (probably OK, since it's supposed to be immutable).
> >
> > 3. Allow the field to be reused, only throw the exception only if
> > someone calls the position() method later.
> >
> > Any of those sound like a useful change for your use case?  Don't
> > hesitate to create a JIRA or contribution if you like!
> >
> > All my best, Ryan
> >
> > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
> > <co...@gmail.com> wrote:
> > >
> > > Hello,
> > >
> > > I'm trying to understand working with Avro records and schemas,
> > > programmatically. Then I was first trying to create a new schema and
> > > records based on existing records, but with a different name /
> > > namespace. It seems then I don't understand getFields() or
> > > createRecord(...). Why can't I use the fields obtained from
> > > getFields() in createRecord()?  How would I go about this properly?
> > >
> > > // for an existing record already present
> > > GenericRecord someRecord
> > >
> > > // get a list of existing fields
> > > List<Schema.Field> existingFields = someRecord.getSchema().getFields();
> > >
> > > // schema for new record with existing fields
> > > Schema updatedSchema = createRecord("UpdatedName",
> > > "","avro.com.example.namespace" , false, existingFields);
> > >
> > > ^^ throws an exception ^^
> > >
> > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
> > > used: eventMetadata type:UNION pos:0
> > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> > > at org.apache.avro.Schema.createRecord(Schema.java:217)
> > > */
> > >
> > > final int length = fields.size();
> > >
> > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
> > > for (int i = 0; i < length; i++) {
> > >     final Schema.Field field = existingFields.get(i);
> > >     clonedRecord.put(i, someRecord.get(i));
> > > }
> > >
> > >
> > > Best Regards,
> > >
> > > Colin Williams

Re: working with Avro records and schemas, programmatically

Posted by Colin Williams <co...@gmail.com>.
Hi Ryan, Thanks for your explanation. I am thinking now that the
design of AVRO suggests that data and schemas are very planned things.
That changes are planned through versioning and we don't like
duplicated schemas (when the positioning makes sense).

I have a round about way of learning. Sometimes I am working with data
and I think it's convenient to transform my data programmatically and
try to obtain a schema from that. Also I think that schemas can become
cumbersome when many fields are involved in intricate patterns.

I think maybe there are other forms maybe more well suited for that.

Regarding your proposals 1,2 seem reasonable to me. But someone like
myself might also not fully understand the design of AVRO.
A better exception or some kind of lead for armchair programmers to
better understand the exception. Thanks for mentioning the copy
operation.

Finally I do see something about aliases.

Thanks,

Colin



On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <ry...@skraba.com> wrote:
>
> Hello Colin, you've hit one bit of fussiness with the Java SDK... you
> can't reuse a Schema.Field object in two Records, because a field
> knows its own position in the record[1].  If a field were to belong to
> two records at different positions, this method would have an
> ambiguous response.
>
> As a workaround, since Avro 1.9, there's a copy constructor that you
> can use to clone the field:
>
> List<Schema.Field> clonedFields = existingFields.stream()
>         .map(f -> new Schema.Field(f, f.schema()))
>         .collect(Collectors.toList());
>
> That being said, I don't see any reason we MUST throw an exception.
> There's a couple of alternative strategies we could use in the Java
> SDK:
>
> 1. If the position is the same in both records, allow the field to be
> reused (which enables cloning use cases).
>
> 2. Make a copy of the field to reuse internally if the position is
> already set (probably OK, since it's supposed to be immutable).
>
> 3. Allow the field to be reused, only throw the exception only if
> someone calls the position() method later.
>
> Any of those sound like a useful change for your use case?  Don't
> hesitate to create a JIRA or contribution if you like!
>
> All my best, Ryan
>
> On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
> <co...@gmail.com> wrote:
> >
> > Hello,
> >
> > I'm trying to understand working with Avro records and schemas,
> > programmatically. Then I was first trying to create a new schema and
> > records based on existing records, but with a different name /
> > namespace. It seems then I don't understand getFields() or
> > createRecord(...). Why can't I use the fields obtained from
> > getFields() in createRecord()?  How would I go about this properly?
> >
> > // for an existing record already present
> > GenericRecord someRecord
> >
> > // get a list of existing fields
> > List<Schema.Field> existingFields = someRecord.getSchema().getFields();
> >
> > // schema for new record with existing fields
> > Schema updatedSchema = createRecord("UpdatedName",
> > "","avro.com.example.namespace" , false, existingFields);
> >
> > ^^ throws an exception ^^
> >
> > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
> > used: eventMetadata type:UNION pos:0
> > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> > at org.apache.avro.Schema.createRecord(Schema.java:217)
> > */
> >
> > final int length = fields.size();
> >
> > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
> > for (int i = 0; i < length; i++) {
> >     final Schema.Field field = existingFields.get(i);
> >     clonedRecord.put(i, someRecord.get(i));
> > }
> >
> >
> > Best Regards,
> >
> > Colin Williams

Re: working with Avro records and schemas, programmatically

Posted by Ryan Skraba <ry...@skraba.com>.
Hello Colin, you've hit one bit of fussiness with the Java SDK... you
can't reuse a Schema.Field object in two Records, because a field
knows its own position in the record[1].  If a field were to belong to
two records at different positions, this method would have an
ambiguous response.

As a workaround, since Avro 1.9, there's a copy constructor that you
can use to clone the field:

List<Schema.Field> clonedFields = existingFields.stream()
        .map(f -> new Schema.Field(f, f.schema()))
        .collect(Collectors.toList());

That being said, I don't see any reason we MUST throw an exception.
There's a couple of alternative strategies we could use in the Java
SDK:

1. If the position is the same in both records, allow the field to be
reused (which enables cloning use cases).

2. Make a copy of the field to reuse internally if the position is
already set (probably OK, since it's supposed to be immutable).

3. Allow the field to be reused, only throw the exception only if
someone calls the position() method later.

Any of those sound like a useful change for your use case?  Don't
hesitate to create a JIRA or contribution if you like!

All my best, Ryan

On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
<co...@gmail.com> wrote:
>
> Hello,
>
> I'm trying to understand working with Avro records and schemas,
> programmatically. Then I was first trying to create a new schema and
> records based on existing records, but with a different name /
> namespace. It seems then I don't understand getFields() or
> createRecord(...). Why can't I use the fields obtained from
> getFields() in createRecord()?  How would I go about this properly?
>
> // for an existing record already present
> GenericRecord someRecord
>
> // get a list of existing fields
> List<Schema.Field> existingFields = someRecord.getSchema().getFields();
>
> // schema for new record with existing fields
> Schema updatedSchema = createRecord("UpdatedName",
> "","avro.com.example.namespace" , false, existingFields);
>
> ^^ throws an exception ^^
>
> /* Caused by: org.apache.avro.AvroRuntimeException: Field already
> used: eventMetadata type:UNION pos:0
> at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> at org.apache.avro.Schema.createRecord(Schema.java:217)
> */
>
> final int length = fields.size();
>
> GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
> for (int i = 0; i < length; i++) {
>     final Schema.Field field = existingFields.get(i);
>     clonedRecord.put(i, someRecord.get(i));
> }
>
>
> Best Regards,
>
> Colin Williams