You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Brian Hulette <bh...@google.com> on 2020/08/19 20:52:25 UTC

Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

It looks like this is occurring because we don't actually support mixing
SchemaProviders in nested types. The current SchemaProvider implementations
only support nested types for homogenous types (e.g. an AutoValue with an
AutoValue field). So when you use JavaFieldSchema as the SchemaProvider for
the outer type (EnrichedArticle), it is also used recursively for the inner
type (ArticleEnvelope), rather than using the registered ProtoMessageSchema.

I filed BEAM-10765 [1] to add support for inferring schemas for
non-homogenous types, I think it's something we should be able to support.
I know it's been a while since you reported this, have you found a
workaround in the meantime? Your best bet may be to avoid using
ProtoMessageSchema for the inner class for now and use the same style of
class for the outer and inner class by just creating a POJO or AutoValue
that replicates the ArticleEnvelope class.


Luke: Regarding recursive schemas, Reuven and I have had some discussions
about it offline. I think he said it should be feasible but I don't know
much beyond that.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-10765

On Tue, Jun 30, 2020 at 2:10 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> I want to make my example as simple as possible while also not leaving out
> the details that might be the reason for the error. I don't think there is
> any recursiveness.
> I can also share the ArticleEnvelope Protobuf file If that helps. I've
> tried to register the ArticleEnvelope schema like this:
>
>     TestPipeline p = TestPipeline.create();
>     TypeDescriptor<ArticleProto.ArticleEnvelope>
> articleEnvelopeTypeDescriptor =
>         TypeDescriptor.of(ArticleProto.ArticleEnvelope.class);
>     Schema articleSchema =
>         new
> ProtoMessageSchema().schemaFor(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>
>     SerializableFunction<ArticleProto.ArticleEnvelope, Row>
> articleEnvelopeToRow =
>         new
> ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>
>     SerializableFunction<Row, ArticleProto.ArticleEnvelope>
> articleEnvelopeFromRow =
>         new
> ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>
>
> p.getSchemaRegistry().registerSchemaForClass(ArticleProto.ArticleEnvelope.class,
>         articleSchema,articleEnvelopeToRow,articleEnvelopeFromRow);
>
> The problem is that even when I define and register it like above, as soon
> as I annotate the class EnrichedArticle with @DefaultSchema(JavaFieldSchema.class)
> I get:
>
> Caused by: java.lang.IllegalAccessError: tried to access method
> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
> from class
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO
> at
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO.create(Unknown
> Source)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
> at
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
> at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
> at
> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:414)
>
> So it does not seem to have an effect when the annotation on
> EnrichedArticle is present. Without the annotation however, there is no
> schema defined on the output PCollection, so I have to define it myself for
> the BigQueryIO to work:
> [The code of the EnrichFn transforms an AssetEnvelope to a Java POJO Asset
> class and enriches it via an RPC call, the Asset has a low number of
> fields, so doing the manual mapping here is manageable, even though I would
> like to use
> Beam Schema as soon as this problem here is solved, which would make that
> Asset POJO obsolete.]
>
>     PCollection<KV<String, AssetProto.AssetEnvelope>> assets =
>         p.apply("Create assets", Create.of(kvAsset));
>
>     PCollection<KV<String, ArticleProto.ArticleEnvelope>> articles =
>         p.apply("Create articles", Create.of(kvArticle));
>
>     TupleTag<ArticleProto.ArticleEnvelope> articleTag = new TupleTag<>();
>     TupleTag<AssetProto.AssetEnvelope> assetTag = new TupleTag<>();
>
>     PCollection<KV<String, CoGbkResult>> joinedCollection =
> KeyedPCollectionTuple
>         .of(articleTag, articles).and(assetTag,
> assets).apply(CoGroupByKey.<String>create());
>
>     PCollection<EnrichedArticle> output = joinedCollection
>         .apply(ParDo.of(new ArticlesKafkaToBigQuery.EnrichFn(articleTag,
> assetTag)));
>     // The following line returns false:
>     output.hasSchema()
>
>     ...BigQueryIO...
>
> On Tue, Jun 30, 2020 at 5:48 AM Luke Cwik <lc...@google.com> wrote:
>
>> Can you give context as to whether schemas will ever allow recursive
>> types since this is pretty common in lots of languages?
>>
>> On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette <bh...@google.com>
>> wrote:
>>
>>> It just occurred to me that BEAM-10265 [1] could be the cause of the
>>> stack overflow. Does ArticleEnvelope refer to itself recursively? Beam
>>> schemas are not allowed to be recursive, and it looks like we don't fail
>>> gracefully for recursive proto definitions.
>>>
>>> Brian
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-10265
>>>
>>> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette <bh...@google.com>
>>> wrote:
>>>
>>>> Hm it looks like the error is from trying to call the zero-arg
>>>> constructor for the ArticleEnvelope proto class. Do you have a schema
>>>> registered for ArticleEnvelope?
>>>>
>>>> I think maybe what's happening is Beam finds there's no schema
>>>> registered for ArticleEnvelope, so it just recursively
>>>> applies JavaFieldSchema, which generates code that attempts to use the
>>>> zero-arg constructor. It looks like that's a bug in JavaFieldSchema, we
>>>> should fail earlier with a better message rather than just generating code
>>>> that will try to access a private constructor, I filed a jira for this [1].
>>>>
>>>> I think you can get this working if you register a Schema for
>>>> ArticleEnvelope. I'm not actually sure of the best way to do this since
>>>> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>>>> <re...@google.com>  and +Alex Van Boxel <al...@vanboxel.be>  in case
>>>> they have better advice), you might try just registering a provider
>>>> manually when you create the pipeline, something like
>>>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
>>>> new ProtoMessageSchema())`.
>>>>
>>>> Brian
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-10372
>>>>
>>>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <
>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>
>>>>> A bit more context - I started with the Beam documentation and
>>>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>>>>> dug deeper and tried to implement the methods myself.
>>>>>
>>>>> What I also tried is the following class definition:
>>>>>
>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>> public class EnrichedArticle implements Serializable {
>>>>>
>>>>>   // ArticleEnvelope is generated from Protobuf
>>>>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>>>>   // Asset is a Java POJO
>>>>>   @Nullable public List<Asset> assets;
>>>>>
>>>>>   @SchemaCreate
>>>>>   public EnrichedArticle() {}
>>>>>
>>>>>   @SchemaCreate
>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>> List<Asset> assets) {
>>>>>     this.article = article;
>>>>>     this.assets = assets;
>>>>>   }
>>>>> }
>>>>>
>>>>> This throws the following exception:
>>>>>
>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>> java.lang.IllegalAccessError: tried to access method
>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>> from class
>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>>> ...
>>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>> from class
>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>>> at
>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>>>>> Source)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>>>>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>>>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>>>> at
>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>>>>> at
>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>>>>> at
>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>>>>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>>>>> at
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>>>>> at
>>>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>>>>> at
>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>>>>> at
>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>>>>> at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>>> at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>>> at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>>> at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>>>> at
>>>>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439)
>>>>>
>>>>>
>>>>> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias <
>>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>>
>>>>>> Hi Brian,
>>>>>>
>>>>>> Thank you for your response.
>>>>>>
>>>>>> 1. When I annotate the class with
>>>>>> @DefaultSchema(JavaFieldSchema.class) and my constructor with a @SchemaCreate
>>>>>> ,I get the following exception:
>>>>>>
>>>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>>> from class
>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
>>>>>> at
>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
>>>>>> Source)
>>>>>>
>>>>>> 2. When I annotate the class with
>>>>>> @DefaultSchema(JavaBeanSchema.class), make the fields private and generate
>>>>>> Getters/Setters I get a StackOverflow error:
>>>>>>
>>>>>> java.lang.StackOverflowError
>>>>>> at
>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
>>>>>> at
>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
>>>>>> at
>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
>>>>>> at
>>>>>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
>>>>>> at
>>>>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>>>>>> at
>>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>>>> at
>>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>>>> at
>>>>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>>>>> at
>>>>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>>>>> at
>>>>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>>>>> at
>>>>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>>>>> at
>>>>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>> at
>>>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
>>>>>> [...]
>>>>>>
>>>>>> 2.1 When I make the fields public, the pipeline executes, but the
>>>>>> PCollection does not have a schema associated with it, which causes the
>>>>>> next pipeline step (BigQueryIO) to fail.
>>>>>>
>>>>>> I want to try AutoValue as well, but that requires some more changes
>>>>>> to my code.
>>>>>>
>>>>>> - I tried supplying the ProtoMessageSchema().toRowFunction
>>>>>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
>>>>>> pipeline
>>>>>> - I tried writing my own toRow/fromRow/getSchema functions for the
>>>>>> EnrichedArticle and supplying that to the pipeline
>>>>>>
>>>>>> Where can I put the breakpoints to get a better understanding of what
>>>>>> is happening here?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bh...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Tobias,
>>>>>>>
>>>>>>> You should be able to annotate the EnrichedArticle class with an4
>>>>>>> @DefaultSchema annotation and Beam will infer a schema for it. You would
>>>>>>> need to make some tweaks to the class though to be compatible with the
>>>>>>> built-in schema providers: you could make the members public and use
>>>>>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make
>>>>>>> it into an AutoValue and use AutoValueSchema.
>>>>>>>
>>>>>>> Once you do that you should be able to convert a
>>>>>>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows [1].
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>> [1]
>>>>>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>>>>>>>
>>>>>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <
>>>>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>>>>
>>>>>>>> I have the following class definition:
>>>>>>>>
>>>>>>>> public class EnrichedArticle implements Serializable {
>>>>>>>>
>>>>>>>>   // ArticleEnvelope is generated via Protobuf
>>>>>>>>   private ArticleProto.ArticleEnvelope article;
>>>>>>>>   // Asset is a Java POJO
>>>>>>>>   private List<Asset> assets;
>>>>>>>>
>>>>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>>>>> List<Asset> assets) {
>>>>>>>>     this.article = article;
>>>>>>>>     this.assets = assets;
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> I am trying to generate a SerializableFunction<EnrichedArticle,
>>>>>>>> Row> and a Schema for it so that I can pass it easily to my
>>>>>>>> BigQueryIO at the end of my pipeline. Transforming the article to a Row
>>>>>>>> object is straightforward:
>>>>>>>>
>>>>>>>> First I get the toRow() function for it via the helper:
>>>>>>>>
>>>>>>>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>>>>>>>       ArticleProto.ArticleEnvelope.class));
>>>>>>>>
>>>>>>>> Then I just apply that function to the article field.
>>>>>>>> However I don't know how I can manually transform my list of assets
>>>>>>>> (a simple Java POJO annotated with:
>>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>
>>>>>>>> in my EnrichedArticle container/composition class. What's the
>>>>>>>> recommended way of doing this?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Hi Brian,

Thank you for opening the issue! My current workaround is to generate a
BigQuery schema with helper functions I already have (since I am writing to
BigQuery in the end in my sink). I have the Beam Schema function still in
the code, but I currently don't use them as I couldn't make them work in
time for a company internal demo. (So basically following your advice
trying to avoid the ProtoMessageSchema.)

Best,
Tobi

On Wed, Aug 19, 2020 at 10:52 PM Brian Hulette <bh...@google.com> wrote:

> It looks like this is occurring because we don't actually support mixing
> SchemaProviders in nested types. The current SchemaProvider implementations
> only support nested types for homogenous types (e.g. an AutoValue with an
> AutoValue field). So when you use JavaFieldSchema as the SchemaProvider for
> the outer type (EnrichedArticle), it is also used recursively for the inner
> type (ArticleEnvelope), rather than using the registered ProtoMessageSchema.
>
> I filed BEAM-10765 [1] to add support for inferring schemas for
> non-homogenous types, I think it's something we should be able to support.
> I know it's been a while since you reported this, have you found a
> workaround in the meantime? Your best bet may be to avoid using
> ProtoMessageSchema for the inner class for now and use the same style of
> class for the outer and inner class by just creating a POJO or AutoValue
> that replicates the ArticleEnvelope class.
>
>
> Luke: Regarding recursive schemas, Reuven and I have had some discussions
> about it offline. I think he said it should be feasible but I don't know
> much beyond that.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-10765
>
> On Tue, Jun 30, 2020 at 2:10 AM Kaymak, Tobias <to...@ricardo.ch>
> wrote:
>
>> I want to make my example as simple as possible while also not leaving
>> out the details that might be the reason for the error. I don't think there
>> is any recursiveness.
>> I can also share the ArticleEnvelope Protobuf file If that helps. I've
>> tried to register the ArticleEnvelope schema like this:
>>
>>     TestPipeline p = TestPipeline.create();
>>     TypeDescriptor<ArticleProto.ArticleEnvelope>
>> articleEnvelopeTypeDescriptor =
>>         TypeDescriptor.of(ArticleProto.ArticleEnvelope.class);
>>     Schema articleSchema =
>>         new
>> ProtoMessageSchema().schemaFor(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>>     SerializableFunction<ArticleProto.ArticleEnvelope, Row>
>> articleEnvelopeToRow =
>>         new
>> ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>>     SerializableFunction<Row, ArticleProto.ArticleEnvelope>
>> articleEnvelopeFromRow =
>>         new
>> ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>>
>> p.getSchemaRegistry().registerSchemaForClass(ArticleProto.ArticleEnvelope.class,
>>         articleSchema,articleEnvelopeToRow,articleEnvelopeFromRow);
>>
>> The problem is that even when I define and register it like above, as
>> soon as I annotate the class EnrichedArticle with @DefaultSchema(JavaFieldSchema.class)
>> I get:
>>
>> Caused by: java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO
>> at
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO.create(Unknown
>> Source)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>> at
>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>> at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>> at
>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>> at
>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:414)
>>
>> So it does not seem to have an effect when the annotation on
>> EnrichedArticle is present. Without the annotation however, there is no
>> schema defined on the output PCollection, so I have to define it myself for
>> the BigQueryIO to work:
>> [The code of the EnrichFn transforms an AssetEnvelope to a Java POJO
>> Asset class and enriches it via an RPC call, the Asset has a low number of
>> fields, so doing the manual mapping here is manageable, even though I would
>> like to use
>> Beam Schema as soon as this problem here is solved, which would make that
>> Asset POJO obsolete.]
>>
>>     PCollection<KV<String, AssetProto.AssetEnvelope>> assets =
>>         p.apply("Create assets", Create.of(kvAsset));
>>
>>     PCollection<KV<String, ArticleProto.ArticleEnvelope>> articles =
>>         p.apply("Create articles", Create.of(kvArticle));
>>
>>     TupleTag<ArticleProto.ArticleEnvelope> articleTag = new TupleTag<>();
>>     TupleTag<AssetProto.AssetEnvelope> assetTag = new TupleTag<>();
>>
>>     PCollection<KV<String, CoGbkResult>> joinedCollection =
>> KeyedPCollectionTuple
>>         .of(articleTag, articles).and(assetTag,
>> assets).apply(CoGroupByKey.<String>create());
>>
>>     PCollection<EnrichedArticle> output = joinedCollection
>>         .apply(ParDo.of(new ArticlesKafkaToBigQuery.EnrichFn(articleTag,
>> assetTag)));
>>     // The following line returns false:
>>     output.hasSchema()
>>
>>     ...BigQueryIO...
>>
>> On Tue, Jun 30, 2020 at 5:48 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Can you give context as to whether schemas will ever allow recursive
>>> types since this is pretty common in lots of languages?
>>>
>>> On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette <bh...@google.com>
>>> wrote:
>>>
>>>> It just occurred to me that BEAM-10265 [1] could be the cause of the
>>>> stack overflow. Does ArticleEnvelope refer to itself recursively? Beam
>>>> schemas are not allowed to be recursive, and it looks like we don't fail
>>>> gracefully for recursive proto definitions.
>>>>
>>>> Brian
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-10265
>>>>
>>>> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette <bh...@google.com>
>>>> wrote:
>>>>
>>>>> Hm it looks like the error is from trying to call the zero-arg
>>>>> constructor for the ArticleEnvelope proto class. Do you have a schema
>>>>> registered for ArticleEnvelope?
>>>>>
>>>>> I think maybe what's happening is Beam finds there's no schema
>>>>> registered for ArticleEnvelope, so it just recursively
>>>>> applies JavaFieldSchema, which generates code that attempts to use the
>>>>> zero-arg constructor. It looks like that's a bug in JavaFieldSchema, we
>>>>> should fail earlier with a better message rather than just generating code
>>>>> that will try to access a private constructor, I filed a jira for this [1].
>>>>>
>>>>> I think you can get this working if you register a Schema for
>>>>> ArticleEnvelope. I'm not actually sure of the best way to do this since
>>>>> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>>>>> <re...@google.com>  and +Alex Van Boxel <al...@vanboxel.be>  in case
>>>>> they have better advice), you might try just registering a provider
>>>>> manually when you create the pipeline, something like
>>>>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
>>>>> new ProtoMessageSchema())`.
>>>>>
>>>>> Brian
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/BEAM-10372
>>>>>
>>>>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <
>>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>>
>>>>>> A bit more context - I started with the Beam documentation and
>>>>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>>>>>> dug deeper and tried to implement the methods myself.
>>>>>>
>>>>>> What I also tried is the following class definition:
>>>>>>
>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>> public class EnrichedArticle implements Serializable {
>>>>>>
>>>>>>   // ArticleEnvelope is generated from Protobuf
>>>>>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>>>>>   // Asset is a Java POJO
>>>>>>   @Nullable public List<Asset> assets;
>>>>>>
>>>>>>   @SchemaCreate
>>>>>>   public EnrichedArticle() {}
>>>>>>
>>>>>>   @SchemaCreate
>>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>>> List<Asset> assets) {
>>>>>>     this.article = article;
>>>>>>     this.assets = assets;
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> This throws the following exception:
>>>>>>
>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>> java.lang.IllegalAccessError: tried to access method
>>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>>> from class
>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>>>> ...
>>>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>>> from class
>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>>>> at
>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>>>>>> Source)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>>>>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>>>>>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>>>>>> at
>>>>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>>>>>> at
>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>>>>>> at
>>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>>>>>> at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>>>> at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>>>> at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>>>> at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>>>>> at
>>>>>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439)
>>>>>>
>>>>>>
>>>>>> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias <
>>>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>>>
>>>>>>> Hi Brian,
>>>>>>>
>>>>>>> Thank you for your response.
>>>>>>>
>>>>>>> 1. When I annotate the class with
>>>>>>> @DefaultSchema(JavaFieldSchema.class) and my constructor with a @SchemaCreate
>>>>>>> ,I get the following exception:
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.<init>()V
>>>>>>> from class
>>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
>>>>>>> at
>>>>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
>>>>>>> Source)
>>>>>>>
>>>>>>> 2. When I annotate the class with
>>>>>>> @DefaultSchema(JavaBeanSchema.class), make the fields private and generate
>>>>>>> Getters/Setters I get a StackOverflow error:
>>>>>>>
>>>>>>> java.lang.StackOverflowError
>>>>>>> at
>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
>>>>>>> at
>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
>>>>>>> at
>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
>>>>>>> at
>>>>>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>>>>>>> at
>>>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>>>>> at
>>>>>>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>>>>>>> at
>>>>>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>>>>>> at
>>>>>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>>>>>> at
>>>>>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>>>>>> at
>>>>>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>>>>>> at
>>>>>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>> at
>>>>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
>>>>>>> [...]
>>>>>>>
>>>>>>> 2.1 When I make the fields public, the pipeline executes, but the
>>>>>>> PCollection does not have a schema associated with it, which causes the
>>>>>>> next pipeline step (BigQueryIO) to fail.
>>>>>>>
>>>>>>> I want to try AutoValue as well, but that requires some more changes
>>>>>>> to my code.
>>>>>>>
>>>>>>> - I tried supplying the ProtoMessageSchema().toRowFunction
>>>>>>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
>>>>>>> pipeline
>>>>>>> - I tried writing my own toRow/fromRow/getSchema functions for the
>>>>>>> EnrichedArticle and supplying that to the pipeline
>>>>>>>
>>>>>>> Where can I put the breakpoints to get a better understanding of
>>>>>>> what is happening here?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette <bh...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Tobias,
>>>>>>>>
>>>>>>>> You should be able to annotate the EnrichedArticle class with an4
>>>>>>>> @DefaultSchema annotation and Beam will infer a schema for it. You would
>>>>>>>> need to make some tweaks to the class though to be compatible with the
>>>>>>>> built-in schema providers: you could make the members public and use
>>>>>>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make
>>>>>>>> it into an AutoValue and use AutoValueSchema.
>>>>>>>>
>>>>>>>> Once you do that you should be able to convert a
>>>>>>>> PCollection<EnrichedArticle> to a PCollection<Row> with Convert.toRows [1].
>>>>>>>>
>>>>>>>> Brian
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>>>>>>>>
>>>>>>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias <
>>>>>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>>>>>
>>>>>>>>> I have the following class definition:
>>>>>>>>>
>>>>>>>>> public class EnrichedArticle implements Serializable {
>>>>>>>>>
>>>>>>>>>   // ArticleEnvelope is generated via Protobuf
>>>>>>>>>   private ArticleProto.ArticleEnvelope article;
>>>>>>>>>   // Asset is a Java POJO
>>>>>>>>>   private List<Asset> assets;
>>>>>>>>>
>>>>>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>>>>>> List<Asset> assets) {
>>>>>>>>>     this.article = article;
>>>>>>>>>     this.assets = assets;
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> I am trying to generate a SerializableFunction<EnrichedArticle,
>>>>>>>>> Row> and a Schema for it so that I can pass it easily to my
>>>>>>>>> BigQueryIO at the end of my pipeline. Transforming the article to a Row
>>>>>>>>> object is straightforward:
>>>>>>>>>
>>>>>>>>> First I get the toRow() function for it via the helper:
>>>>>>>>>
>>>>>>>>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>>>>>>>>       ArticleProto.ArticleEnvelope.class));
>>>>>>>>>
>>>>>>>>> Then I just apply that function to the article field.
>>>>>>>>> However I don't know how I can manually transform my list of
>>>>>>>>> assets (a simple Java POJO annotated with:
>>>>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>>>>>
>>>>>>>>> in my EnrichedArticle container/composition class. What's the
>>>>>>>>> recommended way of doing this?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>