You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Cristian Constantinescu <ze...@gmail.com> on 2021/09/10 07:53:14 UTC

Beam Schemas and refactoring code

Hi everyone,

Every article and talk about Beam recommends using Schemas and Row.
However, using Row throughout my pipelines makes things very difficult to
refactor code when schemas change compared to POJOs/Beans that provide
static code analysis in the IDE.

Does anyone have any tips or tricks to make refactoring easier when using
Row?

For example: The initial pipeline transforms used a Person schema Row
{firstName:string, lastname:string}. There's some steps that do filtering,
and various other things. Now we realize that the Kafka metadata from the
Persons topic is also important, so our Row schema becomes Row {metadata:
Row {..whatever fields..}, item: Row {firstName: string, lastname:string}}.
What would be an easy way to figure out what code changes I need to make to
effectively add an "item." in front of the previous fields accessed by name?

Search and replace isn't ideal. And for more complex pipelines it quickly
becomes very difficult to figure out which fields come from which nested
rows and if they need to change due to refactoring. In fact, the only way
to refactor is to run the pipeline multiple times and analyse the exact
schema at a given line of code before changing it accordingly, then
restarting the process with the breakpoint a little further in the pipeline.

One solution I thought is to leave my PTransforms use Row in their
signature but the first and last steps of those PTransforms would be to use
Convert.to(POJO) and Convert.toRows(schema). Basically this provides a
static context inside the transforms. However, most of my schemas are
derived from Avro schemas, and when adding Avro objects to my POJOs,
Convert.to(POJO) throws a StackOverflowError ([1]stacktrace at the end of
the email)

I guess I could Auto_Value the properties of my Avro objects and use those
automatically generated Auto_Values pojos inside my PTransforms. However,
that means I have to keep my Avro definitions and my Auto_Values in sync.

I understand why Schemas and Row are important, especially after hearing
Andrew talk about them at the 2021 Beam summit [3]. However, using Row
feels a lot like using DataTable [2] in .NET and that brings back
refactoring nightmares.

Thanks,
Cristian

[1]
Exception in thread "main" java.lang.StackOverflowError
at java.base/java.util.HashMap.hash(HashMap.java:339)
at java.base/java.util.HashMap.remove(HashMap.java:794)
at java.base/java.util.HashSet.remove(HashSet.java:236)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:88)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.getTypeMappings(TypeResolver.java:384)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver.covariantly(TypeResolver.java:75)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getCovariantTypeResolver(TypeToken.java:1181)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.resolveSupertype(TypeToken.java:273)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getSupertype(TypeToken.java:398)
at
org.apache.beam.sdk.values.TypeDescriptor.getSupertype(TypeDescriptor.java:188)
at
org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:203)
at
org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:257)
at
org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:172)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:67)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:92)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)

[2]
https://docs.microsoft.com/en-us/dotnet/api/system.data.datatable?view=net-5.0
[3] https://www.youtube.com/watch?v=4rDZ0b0TOvc

Re: Beam Schemas and refactoring code

Posted by Reuven Lax <re...@google.com>.
Embedding an Avro object inside of a Pojo won't work (at least not today)
for schema inference.

On Fri, Sep 10, 2021 at 9:51 AM Cristian Constantinescu <ze...@gmail.com>
wrote:

> Hi Reuven,
>
> Thank you for the information. I'll try to replace my usages of Row with
> pojos.
>
> The schema is not recursive at all. At least it shouldn't be. But some
> properties in the Pojo are Avro generated classes the other properties
> might be primitives or other pojos. I'm not sure what properties are in
> those Avro generated classes besides the fields I define. Maybe some auto
> generated properties are recursive in some weird way.
>
> On Fri., Sep. 10, 2021, 11:52 Reuven Lax, <re...@google.com> wrote:
>
>> You shouldn't need to use Row - Beam schemas are designed specifically so
>> that you can continue using POJOs or beans. If you have a POJO registered
>> (with @DefaultSchema(JavaFieldSchema.class)) Beam should automatically
>> allow you to use it in place of a Row object.
>>
>> One thing to keep in mind - we don't currently support recursive schemas.
>> Seeing the stack trace you get, I wonder if you have recursive (or
>> mutually recursive) fields.
>>
>> Reuven
>>
>> On Fri, Sep 10, 2021 at 12:53 AM Cristian Constantinescu <
>> zeidoo@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> Every article and talk about Beam recommends using Schemas and Row.
>>> However, using Row throughout my pipelines makes things very difficult to
>>> refactor code when schemas change compared to POJOs/Beans that provide
>>> static code analysis in the IDE.
>>>
>>> Does anyone have any tips or tricks to make refactoring easier when
>>> using Row?
>>>
>>> For example: The initial pipeline transforms used a Person schema Row
>>> {firstName:string, lastname:string}. There's some steps that do filtering,
>>> and various other things. Now we realize that the Kafka metadata from the
>>> Persons topic is also important, so our Row schema becomes Row {metadata:
>>> Row {..whatever fields..}, item: Row {firstName: string, lastname:string}}.
>>> What would be an easy way to figure out what code changes I need to make to
>>> effectively add an "item." in front of the previous fields accessed by name?
>>>
>>> Search and replace isn't ideal. And for more complex pipelines it
>>> quickly becomes very difficult to figure out which fields come from which
>>> nested rows and if they need to change due to refactoring. In fact, the
>>> only way to refactor is to run the pipeline multiple times and analyse the
>>> exact schema at a given line of code before changing it accordingly, then
>>> restarting the process with the breakpoint a little further in the pipeline.
>>>
>>> One solution I thought is to leave my PTransforms use Row in their
>>> signature but the first and last steps of those PTransforms would be to use
>>> Convert.to(POJO) and Convert.toRows(schema). Basically this provides a
>>> static context inside the transforms. However, most of my schemas are
>>> derived from Avro schemas, and when adding Avro objects to my POJOs,
>>> Convert.to(POJO) throws a StackOverflowError ([1]stacktrace at the end of
>>> the email)
>>>
>>> I guess I could Auto_Value the properties of my Avro objects and
>>> use those automatically generated Auto_Values pojos inside my PTransforms.
>>> However, that means I have to keep my Avro definitions and my Auto_Values
>>> in sync.
>>>
>>> I understand why Schemas and Row are important, especially after hearing
>>> Andrew talk about them at the 2021 Beam summit [3]. However, using Row
>>> feels a lot like using DataTable [2] in .NET and that brings back
>>> refactoring nightmares.
>>>
>>> Thanks,
>>> Cristian
>>>
>>> [1]
>>> Exception in thread "main" java.lang.StackOverflowError
>>> at java.base/java.util.HashMap.hash(HashMap.java:339)
>>> at java.base/java.util.HashMap.remove(HashMap.java:794)
>>> at java.base/java.util.HashSet.remove(HashSet.java:236)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:88)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.getTypeMappings(TypeResolver.java:384)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver.covariantly(TypeResolver.java:75)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getCovariantTypeResolver(TypeToken.java:1181)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.resolveSupertype(TypeToken.java:273)
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getSupertype(TypeToken.java:398)
>>> at
>>> org.apache.beam.sdk.values.TypeDescriptor.getSupertype(TypeDescriptor.java:188)
>>> at
>>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:203)
>>> at
>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:257)
>>> at
>>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:172)
>>> at
>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>>> at
>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>>> at
>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>>> at
>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>>> at
>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>>> at
>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>>> at
>>> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>>> at
>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>> at
>>> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>>> at
>>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:67)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:92)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>>> at
>>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>>>
>>> [2]
>>> https://docs.microsoft.com/en-us/dotnet/api/system.data.datatable?view=net-5.0
>>> [3] https://www.youtube.com/watch?v=4rDZ0b0TOvc
>>>
>>

Re: Beam Schemas and refactoring code

Posted by Cristian Constantinescu <ze...@gmail.com>.
Hi Reuven,

Thank you for the information. I'll try to replace my usages of Row with
pojos.

The schema is not recursive at all. At least it shouldn't be. But some
properties in the Pojo are Avro generated classes the other properties
might be primitives or other pojos. I'm not sure what properties are in
those Avro generated classes besides the fields I define. Maybe some auto
generated properties are recursive in some weird way.

On Fri., Sep. 10, 2021, 11:52 Reuven Lax, <re...@google.com> wrote:

> You shouldn't need to use Row - Beam schemas are designed specifically so
> that you can continue using POJOs or beans. If you have a POJO registered
> (with @DefaultSchema(JavaFieldSchema.class)) Beam should automatically
> allow you to use it in place of a Row object.
>
> One thing to keep in mind - we don't currently support recursive schemas.
> Seeing the stack trace you get, I wonder if you have recursive (or
> mutually recursive) fields.
>
> Reuven
>
> On Fri, Sep 10, 2021 at 12:53 AM Cristian Constantinescu <ze...@gmail.com>
> wrote:
>
>> Hi everyone,
>>
>> Every article and talk about Beam recommends using Schemas and Row.
>> However, using Row throughout my pipelines makes things very difficult to
>> refactor code when schemas change compared to POJOs/Beans that provide
>> static code analysis in the IDE.
>>
>> Does anyone have any tips or tricks to make refactoring easier when using
>> Row?
>>
>> For example: The initial pipeline transforms used a Person schema Row
>> {firstName:string, lastname:string}. There's some steps that do filtering,
>> and various other things. Now we realize that the Kafka metadata from the
>> Persons topic is also important, so our Row schema becomes Row {metadata:
>> Row {..whatever fields..}, item: Row {firstName: string, lastname:string}}.
>> What would be an easy way to figure out what code changes I need to make to
>> effectively add an "item." in front of the previous fields accessed by name?
>>
>> Search and replace isn't ideal. And for more complex pipelines it quickly
>> becomes very difficult to figure out which fields come from which nested
>> rows and if they need to change due to refactoring. In fact, the only way
>> to refactor is to run the pipeline multiple times and analyse the exact
>> schema at a given line of code before changing it accordingly, then
>> restarting the process with the breakpoint a little further in the pipeline.
>>
>> One solution I thought is to leave my PTransforms use Row in their
>> signature but the first and last steps of those PTransforms would be to use
>> Convert.to(POJO) and Convert.toRows(schema). Basically this provides a
>> static context inside the transforms. However, most of my schemas are
>> derived from Avro schemas, and when adding Avro objects to my POJOs,
>> Convert.to(POJO) throws a StackOverflowError ([1]stacktrace at the end of
>> the email)
>>
>> I guess I could Auto_Value the properties of my Avro objects and
>> use those automatically generated Auto_Values pojos inside my PTransforms.
>> However, that means I have to keep my Avro definitions and my Auto_Values
>> in sync.
>>
>> I understand why Schemas and Row are important, especially after hearing
>> Andrew talk about them at the 2021 Beam summit [3]. However, using Row
>> feels a lot like using DataTable [2] in .NET and that brings back
>> refactoring nightmares.
>>
>> Thanks,
>> Cristian
>>
>> [1]
>> Exception in thread "main" java.lang.StackOverflowError
>> at java.base/java.util.HashMap.hash(HashMap.java:339)
>> at java.base/java.util.HashMap.remove(HashMap.java:794)
>> at java.base/java.util.HashSet.remove(HashSet.java:236)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:88)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.getTypeMappings(TypeResolver.java:384)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver.covariantly(TypeResolver.java:75)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getCovariantTypeResolver(TypeToken.java:1181)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.resolveSupertype(TypeToken.java:273)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getSupertype(TypeToken.java:398)
>> at
>> org.apache.beam.sdk.values.TypeDescriptor.getSupertype(TypeDescriptor.java:188)
>> at
>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:203)
>> at
>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:257)
>> at
>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:172)
>> at
>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>> at
>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>> at
>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>> at
>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>> at
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>> at
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>> at
>> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>> at
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>> at
>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:67)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:92)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>>
>> [2]
>> https://docs.microsoft.com/en-us/dotnet/api/system.data.datatable?view=net-5.0
>> [3] https://www.youtube.com/watch?v=4rDZ0b0TOvc
>>
>

Re: Beam Schemas and refactoring code

Posted by Reuven Lax <re...@google.com>.
You shouldn't need to use Row - Beam schemas are designed specifically so
that you can continue using POJOs or beans. If you have a POJO registered
(with @DefaultSchema(JavaFieldSchema.class)) Beam should automatically
allow you to use it in place of a Row object.

One thing to keep in mind - we don't currently support recursive schemas.
Seeing the stack trace you get, I wonder if you have recursive (or
mutually recursive) fields.

Reuven

On Fri, Sep 10, 2021 at 12:53 AM Cristian Constantinescu <ze...@gmail.com>
wrote:

> Hi everyone,
>
> Every article and talk about Beam recommends using Schemas and Row.
> However, using Row throughout my pipelines makes things very difficult to
> refactor code when schemas change compared to POJOs/Beans that provide
> static code analysis in the IDE.
>
> Does anyone have any tips or tricks to make refactoring easier when using
> Row?
>
> For example: The initial pipeline transforms used a Person schema Row
> {firstName:string, lastname:string}. There's some steps that do filtering,
> and various other things. Now we realize that the Kafka metadata from the
> Persons topic is also important, so our Row schema becomes Row {metadata:
> Row {..whatever fields..}, item: Row {firstName: string, lastname:string}}.
> What would be an easy way to figure out what code changes I need to make to
> effectively add an "item." in front of the previous fields accessed by name?
>
> Search and replace isn't ideal. And for more complex pipelines it quickly
> becomes very difficult to figure out which fields come from which nested
> rows and if they need to change due to refactoring. In fact, the only way
> to refactor is to run the pipeline multiple times and analyse the exact
> schema at a given line of code before changing it accordingly, then
> restarting the process with the breakpoint a little further in the pipeline.
>
> One solution I thought is to leave my PTransforms use Row in their
> signature but the first and last steps of those PTransforms would be to use
> Convert.to(POJO) and Convert.toRows(schema). Basically this provides a
> static context inside the transforms. However, most of my schemas are
> derived from Avro schemas, and when adding Avro objects to my POJOs,
> Convert.to(POJO) throws a StackOverflowError ([1]stacktrace at the end of
> the email)
>
> I guess I could Auto_Value the properties of my Avro objects and use those
> automatically generated Auto_Values pojos inside my PTransforms. However,
> that means I have to keep my Avro definitions and my Auto_Values in sync.
>
> I understand why Schemas and Row are important, especially after hearing
> Andrew talk about them at the 2021 Beam summit [3]. However, using Row
> feels a lot like using DataTable [2] in .NET and that brings back
> refactoring nightmares.
>
> Thanks,
> Cristian
>
> [1]
> Exception in thread "main" java.lang.StackOverflowError
> at java.base/java.util.HashMap.hash(HashMap.java:339)
> at java.base/java.util.HashMap.remove(HashMap.java:794)
> at java.base/java.util.HashSet.remove(HashSet.java:236)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:88)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.getTypeMappings(TypeResolver.java:384)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver.covariantly(TypeResolver.java:75)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getCovariantTypeResolver(TypeToken.java:1181)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.resolveSupertype(TypeToken.java:273)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getSupertype(TypeToken.java:398)
> at
> org.apache.beam.sdk.values.TypeDescriptor.getSupertype(TypeDescriptor.java:188)
> at
> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:203)
> at
> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:257)
> at
> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:172)
> at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
> at
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> at
> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:67)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:92)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
> at
> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
>
> [2]
> https://docs.microsoft.com/en-us/dotnet/api/system.data.datatable?view=net-5.0
> [3] https://www.youtube.com/watch?v=4rDZ0b0TOvc
>