You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jimmy Headdon <ji...@mojiworks.com> on 2022/04/15 08:14:01 UTC

[Question] Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

Hello

I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to v2.37.0
(Java 8 & Maven), but have run into an issue with a breaking change that
I'd appreciate some support with.  Sorry this is quite a long one, I wanted
to capture as much context as I could, but please shout if there's anything
you'd like to dig into.

I'd note that I've also raised this on StackOverflow, if you find it easier
to read the Markdown there - https://stackoverflow.com/q/71875593/18805546.

I'm using Beam inside GCP Dataflow to read data from BigQuery, then
processing aggregates before I write the results back to BigQuery.  I'm
able to read from/write to BigQuery without issue, but after the upgrade my
pipeline to calculate aggregates is failing at runtime, specifically a
`DoFn` I have written to sanitise the results returned from the Beam
`SqlTransform.query` command.  I call this function within `ParDo.of` to
detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
aggregates in Beam SQL returns the Double min/max values when it encounters
a `NULL` value, rather than just returning NULL.  I did try filtering the
initial BigQuery raw data results, but this issue creeps in at the Beam SQL
level.

There may be better ways to do this (I'm open to suggestions!).  I've
included a bunch of code snippets from my pipeline that I've tried to
simplify, so apologies if there's anything obviously janky.  Here's what I
previously had before the library upgrade:

    PCollection<Row> aggregates = inputCollection.apply(
        "Generate Aggregates",

SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
    )
    .apply(ParDo.of(new HandleNullValues()));

I've included the `HandleNullValues` definition at the bottom of this
email, but it appears v2.21.0 introduced a breaking change whereby the
coder inference was disabled for Beam Row types in [this ticket](
https://issues.apache.org/jira/browse/BEAM-9569).  This change has caused
the above code to fail with the following runtime error:

> [ERROR] Failed to execute goal
> org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> project dataflow-example: An exception occured while executing the
> Java class. Unable to return a default Coder for
> ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
> [PCollection@83398426]. Correct one of the following root causes:
> [ERROR]   No Coder has been manually specified;  you may do so using
> .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
> failed: Cannot provide a coder for a Beam Row. Please provide a schema
> instead using PCollection.setRowSchema. [ERROR]   Using the default
> output Coder from the producing PTransform failed:
> PTransform.getOutputCoder called.

I've followed the advice on the aforementioned JIRA ticket, plus a bunch of
other examples I found online, but without much joy.  I've tried applying
`setCoder(SerializableCoder.of(Row.class))` after the `.apply(ParDo.of(new
HandleNullValues()))` which fixes this error (though I'm not yet sure if
it's just suppressed the error, or if it's actually working), but that
changes causes another runtime error:

> [ERROR] Failed to execute goal
> org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> project dataflow-example: An exception occured while executing the
> Java class. Cannot call getSchema when there is no schema -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
> execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
> (default-cli) on project dataflow-example: An exception occured while
> executing the Java class. Cannot call getSchema when there is no
> schema

This error is thrown further down my pipeline, when I perform a subsequent
`SqlTransform.query` to JOIN some results together.

    PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
                    .and(new TupleTag<Row>("Experiments"), experiments)
                        .apply("Joining Aggregates to Experiments",
SqlTransform.query(aggregateExperimentJoin()))
                        .apply(ParDo.of(new
MapBeamRowsToBigQueryTableRows()))
                        .apply(BigQueryIO.writeTableRows()

.withCreateDisposition(CreateDisposition.CREATE_NEVER)

.withWriteDisposition(WriteDisposition.WRITE_APPEND)

.to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
String>) projectId -> projectId + ":daily_aggregates.experiments")));

I've verified the `aggregates` collection is indeed missing a schema if I
interrogate the `hasSchema` property.  The second `experiments` PCollection
above does have a row schema set though:

    PCollection<Row> rawExperiments = rows.apply(
        SqlTransform.query("select sessionId, experiments from PCOLLECTION")
    );
    PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
CustomFunctions.ParseExperiments(bigQuerySchema)));
    experiments.setRowSchema(bigQuerySchema);

I've also tried applying this coder at the pipeline level, with different
variations on the following.  But this also gives the same error:

    CoderRegistry cr = pipeline.getCoderRegistry();
    cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
    cr.registerCoderForType(TypeDescriptors.rows(),
RowCoder.of(bigQuerySchema));

The `bigQuerySchema` object referenced above is the initial schema used to
retrieve all raw data from BigQuery, though that part of the pipeline works
fine, so potentially I need to pass the `aggregatesSchema` object (see
below) in to `registerCoderForType` for the pipeline?

I then tried to set the row schema on `aggregates` (which was another
suggestion in the error above).  I've confirmed that calling `setCoder` is
responsible for the previous `Row` schema disappearing, where it had
previously been set by the input PCollection (and also if I call
`setRowSchema` immediately before I call the `DoFn`.

I've simplified the schema for succinctness in this post, but it's a subset
of `bigQuerySchema` with a few new fields (simple data types).  Here's what
I've tried, again with various combinations of where I call `setCoder` and
`setRowSchema` (before `apply()` and/or after).

    Schema aggregatesSchema = Schema.builder()
        .addNullableField("userId", FieldType.STRING)
        .addNullableField("sessionId", FieldType.STRING)
        .addNullableField("experimentsPerDay", FieldType.INT64)
        .build();

    PCollection<Row> aggregates = inputCollection.apply(
        "Generate Aggregates",

SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
    )
    .apply(ParDo.of(new HandleNullValues()))
    .setCoder(SerializableCoder.of(Row.class))
    .setRowSchema(aggregatesSchema);

Unfortunately, this causes a third runtime error which I've not been able
to figure out:

> [ERROR] Failed to execute goal
> org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> project dataflow-example: An exception occured while executing the
> Java class. java.lang.IllegalStateException -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
> execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
> (default-cli) on project dataflow-example: An exception occured while
> executing the Java class. java.lang.IllegalStateException

The full call stack is at the bottom of this email, and I can see it
originating from my `HandleNullValues` `DoFn`, but after that it disappears
into the Beam libraries.

I'm at a loss as to which route is recommended, and how to proceed, as both
coder and schema options are causing different issues.

Any help would be greatly appreciated, and thanks for your efforts on this
project!


The full `DoFn` I've referred to is further below, but it's worth noting
that just having an essentially empty `DoFn` with both input and output of
Beam `Row` types causes the same issue:

    public static class HandleNullValues extends DoFn<Row, Row> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            Row row = c.element();
            c.output(row);
        }
    }

Here's the full implementation, if anyone can think of a better way to
detect and replace `NULL` values returned from Beam SQL:

    public static class HandleNullValues extends DoFn<Row, Row> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            Row row = c.element();
            List<String> fields = row.getSchema().getFieldNames();
            Builder rowBuilder = Row.withSchema(row.getSchema());

            for (String f: fields) {
                Object value = row.getValue(f);
                if (value != null && value instanceof Long) {
                    Long longVal = row.getInt64(f);
                    if (longVal == Long.MAX_VALUE || longVal ==
Long.MIN_VALUE) {
                        rowBuilder.addValue(null);
                    } else {
                        rowBuilder.addValue(value);
                    }
                } else if (value != null && value instanceof Double) {
                    Double doubleVal = row.getDouble(f);
                    if (doubleVal == Double.MAX_VALUE || doubleVal ==
Double.MIN_VALUE) {
                        rowBuilder.addValue(null);
                    } else {
                        rowBuilder.addValue(value);
                    }
                } else {
                    rowBuilder.addValue(value);
                }
            }

            Row newRow = rowBuilder.build();
            c.output(newRow);
        }
    }

And here's the full callstack from the `setRowSchema` issue detailed above:


> [ERROR] Failed to execute goal
> org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> project dataflow-example: An exception occured while executing the
> Java class. java.lang.IllegalStateException -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
> execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
> (default-cli) on project dataflow-example: An exception occured while
> executing the Java class. java.lang.IllegalStateException
>     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
(MojoExecutor.java:306)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:211)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:165)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:157)
>     at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> (LifecycleModuleBuilder.java:121)
>     at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> (LifecycleModuleBuilder.java:81)
>     at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
> (SingleThreadedBuilder.java:56)
>     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
(LifecycleStarter.java:127)
>     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
> (Launcher.java:282)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
(Launcher.java:225)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
> (Launcher.java:406)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.main
(Launcher.java:347) Caused by:
> org.apache.maven.plugin.MojoExecutionException: An exception occured
> while executing the Java class. java.lang.IllegalStateException
>     at org.codehaus.mojo.exec.ExecJavaMojo.execute (ExecJavaMojo.java:311)
>     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
(DefaultBuildPluginManager.java:137)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
(MojoExecutor.java:301)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:211)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:165)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:157)
>     at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> (LifecycleModuleBuilder.java:121)
>     at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> (LifecycleModuleBuilder.java:81)
>     at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
> (SingleThreadedBuilder.java:56)
>     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
(LifecycleStarter.java:127)
>     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
> (Launcher.java:282)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
(Launcher.java:225)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
> (Launcher.java:406)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.main
(Launcher.java:347) Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException
>     at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> (DirectRunner.java:373)
>     at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> (DirectRunner.java:341)
>     at org.apache.beam.runners.direct.DirectRunner.run
(DirectRunner.java:218)
>     at org.apache.beam.runners.direct.DirectRunner.run
(DirectRunner.java:67)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
>     at java.lang.Thread.run (Thread.java:748) Caused by:
java.lang.IllegalStateException
>     at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
> (Preconditions.java:491)
>     at
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
> (RowCoderGenerator.java:314)
>     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
(Unknown Source)
>     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
(Unknown Source)
>     at org.apache.beam.sdk.schemas.SchemaCoder.encode
(SchemaCoder.java:124)
>     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
(CoderUtils.java:85)
>     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
(CoderUtils.java:69)
>     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
(CoderUtils.java:54)
>     at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
>     at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
> (MutationDetectors.java:118)
>     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
(MutationDetectors.java:49)
>     at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
> (ImmutabilityCheckingBundleFactory.java:115)
>     at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
> (ParDoEvaluator.java:305)
>     at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
> (SimpleDoFnRunner.java:268)
>     at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
> (SimpleDoFnRunner.java:84)
>     at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
> (SimpleDoFnRunner.java:416)
>     at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
> (SimpleDoFnRunner.java:404)
>     at com.example.dataflow.Pipeline$HandleNullValues.processElement
(CustomFunctions.java:310)


Cheers!

Re: [Question] Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

Posted by Jimmy Headdon <ji...@mojiworks.com>.
Amazing, thank you for your patience and support with this, Andrew.  I can
see that you've tagged that JIRA item for the v2.39.0 release, do you know
when that release is scheduled to ship by any chance?

Thanks also for your explanation of setting coders and schemas -
unfortunately I'm still unable to get a simple example working where
DoFn<Row, Row> is used.  I've created the below Git Gist which initialises
a collection of three Row objects, performs a basic Beam SQL aggregation,
before finally passing the Row elements through a DoFn where ideally I'd
like to create new Row objects, but for now it does nothing.  Any time I
use a DoFn<Row, Row> my output PCollection loses its row schema, even if I
pass through the original Row object, or if I create a new Row object and
explicitly call withSchema.

https://gist.github.com/jimmyheaddon/ce1185b83e4ef30e3f9d6fa1b9b16455

You'll see I call getSchema at the bottom of the Gist, which causes the
exception "Cannot call getSchema when there is no schema", as the schema
which existed before the DoFn has been lost.  However, if I try to set the
row schema again I instead get an undefined IllegalStateException, so even
if I create the Row objects inside the DoFn with a schema, the resultant
PCollection has lost it, and I'm unable to set it again, which seems like a
bug?

Thanks again, and let me know if you'd like me to test anything else.

On Mon, 18 Apr 2022 at 20:18, Andrew Pilloud <ap...@google.com> wrote:

> I was able to reproduce this with all the inputs being null, thanks for
> that pointer. I filed https://issues.apache.org/jira/browse/BEAM-14321
> and will look into a fix today.
>
> From your example it looks like the input to SqlTransform doesn't have a
> schema, it is using a simple coder which doesn't provide the metadata
> SqlTransform needs to operate on the data. (Also SqlTransform calls
> setRowSchema internally, so you shouldn't need to call it on the output.)
>
> At a high level, setCoder is saying "here is a method that can serialize
> my Java object", setRowSchema is saying "here is metadata about my row (so
> you can serialize it)", and setSchema is saying "here is metadata about
> my Java object and a method to convert my Java object to a row (so you can
> serialize it)". The Schema methods are newer and provide more metadata that
> enables things like SqlTransform to operate on your data.
> Calling setRowSchema is required when you are working with Row directly.
> Generally you shouldn't call setSchema directly as the schema can be
> inferred from other types:
> https://beam.apache.org/documentation/programming-guide/#inferring-schemas
> (I don't believe we infer schemas from simple types today, you need a pojo.)
>
> Andrew
>
> On Sun, Apr 17, 2022 at 4:36 AM Jimmy Headdon <ji...@mojiworks.com>
> wrote:
>
>> Hi Brian
>>
>> This is the shortest pipeline Gist I can come up with to demonstrate
>> "java.lang.IllegalStateException: Cannot call getSchema when there is no
>> schema".  You'll see I've tried setRowSchema and setCoder, but with the
>> same end result.  Any chance you can advise where I'm going wrong, as I
>> wanted to setup a simple pipeline for Andrew on the NULL aggregation
>> results.
>>
>> https://gist.github.com/jimmyheaddon/f0350a29f69c745e31c442942874eb12
>>
>>
>> Thanks again
>>
>> On Sat, 16 Apr 2022 at 09:08, Jimmy Headdon <ji...@mojiworks.com>
>> wrote:
>>
>>> Thanks Andrew, out of interest does your test pass if all of the input
>>> values to the MIN/MAX aggregates are NULL?  If I have a BigQuery column
>>> that contains entirely NULL values, and I then convert them from TableRow
>>> to Row types, they're still showing correctly as NULL if I perform a simple
>>> System.out.printLn:
>>>
>>> rowsFromBigQuery.apply(ParDo.of(new
>>> PrintParDo())).setCoder(SerializableCoder.of(Row.class));
>>> "NULL"
>>> "NULL"
>>> "NULL"....
>>>
>>> If I then apply the following Beam SQL on this PCollection:
>>>
>>> select max(experimentValue) as experimentValue
>>> from PCOLLECTION
>>>
>>> Then the results come back as -9223372036854775808,
>>> or 9223372036854775807 if you use MIN().
>>>
>>> Hopefully I'm doing something silly and it's an easy fix, let me know if
>>> there's anything you'd like me to try.
>>>
>>>
>>> Cheers
>>>
>>>
>>> On Fri, 15 Apr 2022 at 21:20, Andrew Pilloud <ap...@google.com>
>>> wrote:
>>>
>>>> Are you sure the min/max values are coming from SqlTransform? I wrote a
>>>> quick test in Beam (using Integer, but all types have the same null
>>>> wrapper) and the nulls were dropped.
>>>>
>>>> More detail: I added the following test case
>>>> to BeamSqlDslAggregationNullableTest on the latest Beam. The input values
>>>> for f_int1 are 1, null, 2, null, null, null, 3. The test passed, (the
>>>> return value being 1) which indicates we are dropping nulls before
>>>> aggregation. (I don't believe this is actually correct behavior, we should
>>>> be returning null?)
>>>>
>>>>  @Test
>>>>  public void testMin() {
>>>>    String sql = "SELECT min(f_int1) FROM PCOLLECTION";
>>>>
>>>>  PAssert.that(boundedInput.apply(SqlTransform.query(sql))).satisfies(matchesScalar(1));
>>>>    pipeline.run();
>>>>  }
>>>>
>>>> On Fri, Apr 15, 2022 at 11:37 AM Jimmy Headdon <
>>>> jimmy.headdon@mojiworks.com> wrote:
>>>>
>>>>> Thanks for the swift response Brian, Andrew.  I've tried your
>>>>> suggestion Brian, and sadly I get the same error as the lengthy call
>>>>> stack from the end of my original post (IllegalStateException) - it appears
>>>>> the PCollection might have been finalised my the DoFn, and therefore I
>>>>> cannot setRowSchema against it?  In the fully implemented version I
>>>>> captured in my original post you can see I call withSchema when creating
>>>>> the Row objects, though interestingly the cutdown version I also posted
>>>>> gives the same error, even though it's passing the input row to the
>>>>> output without mutating it?
>>>>>
>>>>> Regarding the NULL values from Beam SQL aggregations, I've re-run my
>>>>> pipeline with my NullValueHandler commented out, and unfortunately I can
>>>>> still see min and max integers being written back to BigQuery.  Is there
>>>>> anything you'd like me to test to get you some further feedback?
>>>>>
>>>>> Thanks again!
>>>>>
>>>>> On Fri, 15 Apr 2022 at 18:37, Andrew Pilloud <ap...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Beam SQL's null aggregation behavior changed radically in 2.34.0.
>>>>>> https://github.com/apache/beam/pull/15174
>>>>>>
>>>>>> I think we drop them now?
>>>>>>
>>>>>> Andrew
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <bh...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Jimmy,
>>>>>>>
>>>>>>> Sorry about this, I wonder if this error message could be more
>>>>>>> helpful?
>>>>>>> You're right that the issue is that the output PCollection produced
>>>>>>> by HandleNullValues doesn't have a schema attached to it. Beam has no way
>>>>>>> of inferring the output schema through the opaque DoFn. A quick solution
>>>>>>> might be to just propagate the schema from the SQL output:
>>>>>>>
>>>>>>>     PCollection<Row> sqlOutput = inputCollection.apply(
>>>>>>>         "Generate Aggregates",
>>>>>>>
>>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>>>     )
>>>>>>>
>>>>>>>     PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new
>>>>>>> HandleNullValues())).setRowSchema(inputCollection.getSchema())
>>>>>>>
>>>>>>> @Reuven Lax <re...@google.com> may have some other ideas.
>>>>>>>
>>>>>>> Stepping back to the reason you need to add HandleNullValues: "I
>>>>>>> call this function within `ParDo.of` to detect `Double.MAX_VALUE` and
>>>>>>> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL
>>>>>>> returns the Double min/max values when it encounters a `NULL` value, rather
>>>>>>> than just returning NULL."
>>>>>>> @Andrew Pilloud <ap...@google.com> is this intended? Do you know
>>>>>>> if there's any way to modify this behavior?
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon <
>>>>>>> jimmy.headdon@mojiworks.com> wrote:
>>>>>>>
>>>>>>>> Hello
>>>>>>>>
>>>>>>>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to
>>>>>>>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change
>>>>>>>> that I'd appreciate some support with.  Sorry this is quite a long one, I
>>>>>>>> wanted to capture as much context as I could, but please shout if there's
>>>>>>>> anything you'd like to dig into.
>>>>>>>>
>>>>>>>> I'd note that I've also raised this on StackOverflow, if you find
>>>>>>>> it easier to read the Markdown there -
>>>>>>>> https://stackoverflow.com/q/71875593/18805546.
>>>>>>>>
>>>>>>>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then
>>>>>>>> processing aggregates before I write the results back to BigQuery.  I'm
>>>>>>>> able to read from/write to BigQuery without issue, but after the upgrade my
>>>>>>>> pipeline to calculate aggregates is failing at runtime, specifically a
>>>>>>>> `DoFn` I have written to sanitise the results returned from the Beam
>>>>>>>> `SqlTransform.query` command.  I call this function within `ParDo.of` to
>>>>>>>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
>>>>>>>> aggregates in Beam SQL returns the Double min/max values when it encounters
>>>>>>>> a `NULL` value, rather than just returning NULL.  I did try filtering the
>>>>>>>> initial BigQuery raw data results, but this issue creeps in at the Beam SQL
>>>>>>>> level.
>>>>>>>>
>>>>>>>> There may be better ways to do this (I'm open to suggestions!).
>>>>>>>> I've included a bunch of code snippets from my pipeline that I've tried to
>>>>>>>> simplify, so apologies if there's anything obviously janky.  Here's what I
>>>>>>>> previously had before the library upgrade:
>>>>>>>>
>>>>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>>>>         "Generate Aggregates",
>>>>>>>>
>>>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>>>>     )
>>>>>>>>     .apply(ParDo.of(new HandleNullValues()));
>>>>>>>>
>>>>>>>> I've included the `HandleNullValues` definition at the bottom of
>>>>>>>> this email, but it appears v2.21.0 introduced a breaking change whereby the
>>>>>>>> coder inference was disabled for Beam Row types in [this ticket](
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-9569).  This change has
>>>>>>>> caused the above code to fail with the following runtime error:
>>>>>>>>
>>>>>>>> > [ERROR] Failed to execute goal
>>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>>>> > Java class. Unable to return a default Coder for
>>>>>>>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
>>>>>>>> > [PCollection@83398426]. Correct one of the following root causes:
>>>>>>>> > [ERROR]   No Coder has been manually specified;  you may do so
>>>>>>>> using
>>>>>>>> > .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
>>>>>>>> > failed: Cannot provide a coder for a Beam Row. Please provide a
>>>>>>>> schema
>>>>>>>> > instead using PCollection.setRowSchema. [ERROR]   Using the
>>>>>>>> default
>>>>>>>> > output Coder from the producing PTransform failed:
>>>>>>>> > PTransform.getOutputCoder called.
>>>>>>>>
>>>>>>>> I've followed the advice on the aforementioned JIRA ticket, plus a
>>>>>>>> bunch of other examples I found online, but without much joy.  I've tried
>>>>>>>> applying `setCoder(SerializableCoder.of(Row.class))` after the
>>>>>>>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though
>>>>>>>> I'm not yet sure if it's just suppressed the error, or if it's actually
>>>>>>>> working), but that changes causes another runtime error:
>>>>>>>>
>>>>>>>> > [ERROR] Failed to execute goal
>>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>>>> > Java class. Cannot call getSchema when there is no schema ->
>>>>>>>> [Help 1]
>>>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>>>>> > (default-cli) on project dataflow-example: An exception occured
>>>>>>>> while
>>>>>>>> > executing the Java class. Cannot call getSchema when there is no
>>>>>>>> > schema
>>>>>>>>
>>>>>>>> This error is thrown further down my pipeline, when I perform a
>>>>>>>> subsequent `SqlTransform.query` to JOIN some results together.
>>>>>>>>
>>>>>>>>     PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
>>>>>>>>                     .and(new TupleTag<Row>("Experiments"),
>>>>>>>> experiments)
>>>>>>>>                         .apply("Joining Aggregates to Experiments",
>>>>>>>> SqlTransform.query(aggregateExperimentJoin()))
>>>>>>>>                         .apply(ParDo.of(new
>>>>>>>> MapBeamRowsToBigQueryTableRows()))
>>>>>>>>                         .apply(BigQueryIO.writeTableRows()
>>>>>>>>
>>>>>>>> .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>>>>>>>>
>>>>>>>> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>>>>>>>>
>>>>>>>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
>>>>>>>> String>) projectId -> projectId + ":daily_aggregates.experiments")));
>>>>>>>>
>>>>>>>> I've verified the `aggregates` collection is indeed missing a
>>>>>>>> schema if I interrogate the `hasSchema` property.  The second `experiments`
>>>>>>>> PCollection above does have a row schema set though:
>>>>>>>>
>>>>>>>>     PCollection<Row> rawExperiments = rows.apply(
>>>>>>>>         SqlTransform.query("select sessionId, experiments from
>>>>>>>> PCOLLECTION")
>>>>>>>>     );
>>>>>>>>     PCollection<Row> experiments =
>>>>>>>> rawExperiments.apply(ParDo.of(new
>>>>>>>> CustomFunctions.ParseExperiments(bigQuerySchema)));
>>>>>>>>     experiments.setRowSchema(bigQuerySchema);
>>>>>>>>
>>>>>>>> I've also tried applying this coder at the pipeline level, with
>>>>>>>> different variations on the following.  But this also gives the same error:
>>>>>>>>
>>>>>>>>     CoderRegistry cr = pipeline.getCoderRegistry();
>>>>>>>>     cr.registerCoderForClass(Row.class,
>>>>>>>> RowCoder.of(bigQuerySchema));
>>>>>>>>     cr.registerCoderForType(TypeDescriptors.rows(),
>>>>>>>> RowCoder.of(bigQuerySchema));
>>>>>>>>
>>>>>>>> The `bigQuerySchema` object referenced above is the initial schema
>>>>>>>> used to retrieve all raw data from BigQuery, though that part of the
>>>>>>>> pipeline works fine, so potentially I need to pass the `aggregatesSchema`
>>>>>>>> object (see below) in to `registerCoderForType` for the pipeline?
>>>>>>>>
>>>>>>>> I then tried to set the row schema on `aggregates` (which was
>>>>>>>> another suggestion in the error above).  I've confirmed that calling
>>>>>>>> `setCoder` is responsible for the previous `Row` schema disappearing, where
>>>>>>>> it had previously been set by the input PCollection (and also if I call
>>>>>>>> `setRowSchema` immediately before I call the `DoFn`.
>>>>>>>>
>>>>>>>> I've simplified the schema for succinctness in this post, but it's
>>>>>>>> a subset of `bigQuerySchema` with a few new fields (simple data types).
>>>>>>>> Here's what I've tried, again with various combinations of where I call
>>>>>>>> `setCoder` and `setRowSchema` (before `apply()` and/or after).
>>>>>>>>
>>>>>>>>     Schema aggregatesSchema = Schema.builder()
>>>>>>>>         .addNullableField("userId", FieldType.STRING)
>>>>>>>>         .addNullableField("sessionId", FieldType.STRING)
>>>>>>>>         .addNullableField("experimentsPerDay", FieldType.INT64)
>>>>>>>>         .build();
>>>>>>>>
>>>>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>>>>         "Generate Aggregates",
>>>>>>>>
>>>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>>>>     )
>>>>>>>>     .apply(ParDo.of(new HandleNullValues()))
>>>>>>>>     .setCoder(SerializableCoder.of(Row.class))
>>>>>>>>     .setRowSchema(aggregatesSchema);
>>>>>>>>
>>>>>>>> Unfortunately, this causes a third runtime error which I've not
>>>>>>>> been able to figure out:
>>>>>>>>
>>>>>>>> > [ERROR] Failed to execute goal
>>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>>>>> > (default-cli) on project dataflow-example: An exception occured
>>>>>>>> while
>>>>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>>>>>
>>>>>>>> The full call stack is at the bottom of this email, and I can see
>>>>>>>> it originating from my `HandleNullValues` `DoFn`, but after that it
>>>>>>>> disappears into the Beam libraries.
>>>>>>>>
>>>>>>>> I'm at a loss as to which route is recommended, and how to proceed,
>>>>>>>> as both coder and schema options are causing different issues.
>>>>>>>>
>>>>>>>> Any help would be greatly appreciated, and thanks for your efforts
>>>>>>>> on this project!
>>>>>>>>
>>>>>>>>
>>>>>>>> The full `DoFn` I've referred to is further below, but it's worth
>>>>>>>> noting that just having an essentially empty `DoFn` with both input and
>>>>>>>> output of Beam `Row` types causes the same issue:
>>>>>>>>
>>>>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>>>>         @ProcessElement
>>>>>>>>         public void processElement(ProcessContext c) {
>>>>>>>>             Row row = c.element();
>>>>>>>>             c.output(row);
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>> Here's the full implementation, if anyone can think of a better way
>>>>>>>> to detect and replace `NULL` values returned from Beam SQL:
>>>>>>>>
>>>>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>>>>         @ProcessElement
>>>>>>>>         public void processElement(ProcessContext c) {
>>>>>>>>             Row row = c.element();
>>>>>>>>             List<String> fields = row.getSchema().getFieldNames();
>>>>>>>>             Builder rowBuilder = Row.withSchema(row.getSchema());
>>>>>>>>
>>>>>>>>             for (String f: fields) {
>>>>>>>>                 Object value = row.getValue(f);
>>>>>>>>                 if (value != null && value instanceof Long) {
>>>>>>>>                     Long longVal = row.getInt64(f);
>>>>>>>>                     if (longVal == Long.MAX_VALUE || longVal ==
>>>>>>>> Long.MIN_VALUE) {
>>>>>>>>                         rowBuilder.addValue(null);
>>>>>>>>                     } else {
>>>>>>>>                         rowBuilder.addValue(value);
>>>>>>>>                     }
>>>>>>>>                 } else if (value != null && value instanceof
>>>>>>>> Double) {
>>>>>>>>                     Double doubleVal = row.getDouble(f);
>>>>>>>>                     if (doubleVal == Double.MAX_VALUE || doubleVal
>>>>>>>> == Double.MIN_VALUE) {
>>>>>>>>                         rowBuilder.addValue(null);
>>>>>>>>                     } else {
>>>>>>>>                         rowBuilder.addValue(value);
>>>>>>>>                     }
>>>>>>>>                 } else {
>>>>>>>>                     rowBuilder.addValue(value);
>>>>>>>>                 }
>>>>>>>>             }
>>>>>>>>
>>>>>>>>             Row newRow = rowBuilder.build();
>>>>>>>>             c.output(newRow);
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>> And here's the full callstack from the `setRowSchema` issue
>>>>>>>> detailed above:
>>>>>>>>
>>>>>>>>
>>>>>>>> > [ERROR] Failed to execute goal
>>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>>>>> > (default-cli) on project dataflow-example: An exception occured
>>>>>>>> while
>>>>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>>>>> (MojoExecutor.java:306)
>>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>>> (MojoExecutor.java:211)
>>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>>> (MojoExecutor.java:165)
>>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>>> (MojoExecutor.java:157)
>>>>>>>> >     at
>>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>>>> > (LifecycleModuleBuilder.java:121)
>>>>>>>> >     at
>>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>>>> > (LifecycleModuleBuilder.java:81)
>>>>>>>> >     at
>>>>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>>>>> > (SingleThreadedBuilder.java:56)
>>>>>>>> >     at
>>>>>>>> org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>>>>> (LifecycleStarter.java:127)
>>>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>>>> (DefaultMaven.java:294)
>>>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>>>> (DefaultMaven.java:192)
>>>>>>>> >     at org.apache.maven.DefaultMaven.execute
>>>>>>>> (DefaultMaven.java:105)
>>>>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native
>>>>>>>> Method)
>>>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>>>>> (NativeMethodAccessorImpl.java:62)
>>>>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>>>>> >     at
>>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>>>>> > (Launcher.java:282)
>>>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>>>>> (Launcher.java:225)
>>>>>>>> >     at
>>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>>>>> > (Launcher.java:406)
>>>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>>>>> (Launcher.java:347) Caused by:
>>>>>>>> > org.apache.maven.plugin.MojoExecutionException: An exception
>>>>>>>> occured
>>>>>>>> > while executing the Java class. java.lang.IllegalStateException
>>>>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo.execute
>>>>>>>> (ExecJavaMojo.java:311)
>>>>>>>> >     at
>>>>>>>> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
>>>>>>>> (DefaultBuildPluginManager.java:137)
>>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>>>>> (MojoExecutor.java:301)
>>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>>> (MojoExecutor.java:211)
>>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>>> (MojoExecutor.java:165)
>>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>>> (MojoExecutor.java:157)
>>>>>>>> >     at
>>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>>>> > (LifecycleModuleBuilder.java:121)
>>>>>>>> >     at
>>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>>>> > (LifecycleModuleBuilder.java:81)
>>>>>>>> >     at
>>>>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>>>>> > (SingleThreadedBuilder.java:56)
>>>>>>>> >     at
>>>>>>>> org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>>>>> (LifecycleStarter.java:127)
>>>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>>>> (DefaultMaven.java:294)
>>>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>>>> (DefaultMaven.java:192)
>>>>>>>> >     at org.apache.maven.DefaultMaven.execute
>>>>>>>> (DefaultMaven.java:105)
>>>>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native
>>>>>>>> Method)
>>>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>>>>> (NativeMethodAccessorImpl.java:62)
>>>>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>>>>> >     at
>>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>>>>> > (Launcher.java:282)
>>>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>>>>> (Launcher.java:225)
>>>>>>>> >     at
>>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>>>>> > (Launcher.java:406)
>>>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>>>>> (Launcher.java:347) Caused by:
>>>>>>>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>>>> > java.lang.IllegalStateException
>>>>>>>> >     at
>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>>>>> > (DirectRunner.java:373)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>>>>> > (DirectRunner.java:341)
>>>>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>>>>> (DirectRunner.java:218)
>>>>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>>>>> (DirectRunner.java:67)
>>>>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>>>>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>>>>>>>> >     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>>>>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run
>>>>>>>> (ExecJavaMojo.java:254)
>>>>>>>> >     at java.lang.Thread.run (Thread.java:748) Caused by:
>>>>>>>> java.lang.IllegalStateException
>>>>>>>> >     at
>>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>>>>> > (Preconditions.java:491)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
>>>>>>>> > (RowCoderGenerator.java:314)
>>>>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>>>>> (Unknown Source)
>>>>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>>>>> (Unknown Source)
>>>>>>>> >     at org.apache.beam.sdk.schemas.SchemaCoder.encode
>>>>>>>> (SchemaCoder.java:124)
>>>>>>>> >     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>>>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
>>>>>>>> (CoderUtils.java:85)
>>>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>>>>> (CoderUtils.java:69)
>>>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>>>>> (CoderUtils.java:54)
>>>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.clone
>>>>>>>> (CoderUtils.java:144)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
>>>>>>>> > (MutationDetectors.java:118)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
>>>>>>>> (MutationDetectors.java:49)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
>>>>>>>> > (ImmutabilityCheckingBundleFactory.java:115)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
>>>>>>>> > (ParDoEvaluator.java:305)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
>>>>>>>> > (SimpleDoFnRunner.java:268)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
>>>>>>>> > (SimpleDoFnRunner.java:84)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>>>>> > (SimpleDoFnRunner.java:416)
>>>>>>>> >     at
>>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>>>>> > (SimpleDoFnRunner.java:404)
>>>>>>>> >     at
>>>>>>>> com.example.dataflow.Pipeline$HandleNullValues.processElement
>>>>>>>> (CustomFunctions.java:310)
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers!
>>>>>>>>
>>>>>>>

Re: [Question] Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

Posted by Andrew Pilloud <ap...@google.com>.
I was able to reproduce this with all the inputs being null, thanks for
that pointer. I filed https://issues.apache.org/jira/browse/BEAM-14321 and
will look into a fix today.

From your example it looks like the input to SqlTransform doesn't have a
schema, it is using a simple coder which doesn't provide the metadata
SqlTransform needs to operate on the data. (Also SqlTransform calls
setRowSchema internally, so you shouldn't need to call it on the output.)

At a high level, setCoder is saying "here is a method that can serialize my
Java object", setRowSchema is saying "here is metadata about my row (so you
can serialize it)", and setSchema is saying "here is metadata about my Java
object and a method to convert my Java object to a row (so you can
serialize it)". The Schema methods are newer and provide more metadata that
enables things like SqlTransform to operate on your data.
Calling setRowSchema is required when you are working with Row directly.
Generally you shouldn't call setSchema directly as the schema can be
inferred from other types:
https://beam.apache.org/documentation/programming-guide/#inferring-schemas
(I don't believe we infer schemas from simple types today, you need a pojo.)

Andrew

On Sun, Apr 17, 2022 at 4:36 AM Jimmy Headdon <ji...@mojiworks.com>
wrote:

> Hi Brian
>
> This is the shortest pipeline Gist I can come up with to demonstrate
> "java.lang.IllegalStateException: Cannot call getSchema when there is no
> schema".  You'll see I've tried setRowSchema and setCoder, but with the
> same end result.  Any chance you can advise where I'm going wrong, as I
> wanted to setup a simple pipeline for Andrew on the NULL aggregation
> results.
>
> https://gist.github.com/jimmyheaddon/f0350a29f69c745e31c442942874eb12
>
>
> Thanks again
>
> On Sat, 16 Apr 2022 at 09:08, Jimmy Headdon <ji...@mojiworks.com>
> wrote:
>
>> Thanks Andrew, out of interest does your test pass if all of the input
>> values to the MIN/MAX aggregates are NULL?  If I have a BigQuery column
>> that contains entirely NULL values, and I then convert them from TableRow
>> to Row types, they're still showing correctly as NULL if I perform a simple
>> System.out.printLn:
>>
>> rowsFromBigQuery.apply(ParDo.of(new
>> PrintParDo())).setCoder(SerializableCoder.of(Row.class));
>> "NULL"
>> "NULL"
>> "NULL"....
>>
>> If I then apply the following Beam SQL on this PCollection:
>>
>> select max(experimentValue) as experimentValue
>> from PCOLLECTION
>>
>> Then the results come back as -9223372036854775808,
>> or 9223372036854775807 if you use MIN().
>>
>> Hopefully I'm doing something silly and it's an easy fix, let me know if
>> there's anything you'd like me to try.
>>
>>
>> Cheers
>>
>>
>> On Fri, 15 Apr 2022 at 21:20, Andrew Pilloud <ap...@google.com> wrote:
>>
>>> Are you sure the min/max values are coming from SqlTransform? I wrote a
>>> quick test in Beam (using Integer, but all types have the same null
>>> wrapper) and the nulls were dropped.
>>>
>>> More detail: I added the following test case
>>> to BeamSqlDslAggregationNullableTest on the latest Beam. The input values
>>> for f_int1 are 1, null, 2, null, null, null, 3. The test passed, (the
>>> return value being 1) which indicates we are dropping nulls before
>>> aggregation. (I don't believe this is actually correct behavior, we should
>>> be returning null?)
>>>
>>>  @Test
>>>  public void testMin() {
>>>    String sql = "SELECT min(f_int1) FROM PCOLLECTION";
>>>
>>>  PAssert.that(boundedInput.apply(SqlTransform.query(sql))).satisfies(matchesScalar(1));
>>>    pipeline.run();
>>>  }
>>>
>>> On Fri, Apr 15, 2022 at 11:37 AM Jimmy Headdon <
>>> jimmy.headdon@mojiworks.com> wrote:
>>>
>>>> Thanks for the swift response Brian, Andrew.  I've tried your
>>>> suggestion Brian, and sadly I get the same error as the lengthy call
>>>> stack from the end of my original post (IllegalStateException) - it appears
>>>> the PCollection might have been finalised my the DoFn, and therefore I
>>>> cannot setRowSchema against it?  In the fully implemented version I
>>>> captured in my original post you can see I call withSchema when creating
>>>> the Row objects, though interestingly the cutdown version I also posted
>>>> gives the same error, even though it's passing the input row to the
>>>> output without mutating it?
>>>>
>>>> Regarding the NULL values from Beam SQL aggregations, I've re-run my
>>>> pipeline with my NullValueHandler commented out, and unfortunately I can
>>>> still see min and max integers being written back to BigQuery.  Is there
>>>> anything you'd like me to test to get you some further feedback?
>>>>
>>>> Thanks again!
>>>>
>>>> On Fri, 15 Apr 2022 at 18:37, Andrew Pilloud <ap...@google.com>
>>>> wrote:
>>>>
>>>>> Beam SQL's null aggregation behavior changed radically in 2.34.0.
>>>>> https://github.com/apache/beam/pull/15174
>>>>>
>>>>> I think we drop them now?
>>>>>
>>>>> Andrew
>>>>>
>>>>>
>>>>> On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <bh...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jimmy,
>>>>>>
>>>>>> Sorry about this, I wonder if this error message could be more
>>>>>> helpful?
>>>>>> You're right that the issue is that the output PCollection produced
>>>>>> by HandleNullValues doesn't have a schema attached to it. Beam has no way
>>>>>> of inferring the output schema through the opaque DoFn. A quick solution
>>>>>> might be to just propagate the schema from the SQL output:
>>>>>>
>>>>>>     PCollection<Row> sqlOutput = inputCollection.apply(
>>>>>>         "Generate Aggregates",
>>>>>>
>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>>     )
>>>>>>
>>>>>>     PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new
>>>>>> HandleNullValues())).setRowSchema(inputCollection.getSchema())
>>>>>>
>>>>>> @Reuven Lax <re...@google.com> may have some other ideas.
>>>>>>
>>>>>> Stepping back to the reason you need to add HandleNullValues: "I call
>>>>>> this function within `ParDo.of` to detect `Double.MAX_VALUE` and
>>>>>> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL
>>>>>> returns the Double min/max values when it encounters a `NULL` value, rather
>>>>>> than just returning NULL."
>>>>>> @Andrew Pilloud <ap...@google.com> is this intended? Do you know
>>>>>> if there's any way to modify this behavior?
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon <
>>>>>> jimmy.headdon@mojiworks.com> wrote:
>>>>>>
>>>>>>> Hello
>>>>>>>
>>>>>>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to
>>>>>>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change
>>>>>>> that I'd appreciate some support with.  Sorry this is quite a long one, I
>>>>>>> wanted to capture as much context as I could, but please shout if there's
>>>>>>> anything you'd like to dig into.
>>>>>>>
>>>>>>> I'd note that I've also raised this on StackOverflow, if you find it
>>>>>>> easier to read the Markdown there -
>>>>>>> https://stackoverflow.com/q/71875593/18805546.
>>>>>>>
>>>>>>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then
>>>>>>> processing aggregates before I write the results back to BigQuery.  I'm
>>>>>>> able to read from/write to BigQuery without issue, but after the upgrade my
>>>>>>> pipeline to calculate aggregates is failing at runtime, specifically a
>>>>>>> `DoFn` I have written to sanitise the results returned from the Beam
>>>>>>> `SqlTransform.query` command.  I call this function within `ParDo.of` to
>>>>>>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
>>>>>>> aggregates in Beam SQL returns the Double min/max values when it encounters
>>>>>>> a `NULL` value, rather than just returning NULL.  I did try filtering the
>>>>>>> initial BigQuery raw data results, but this issue creeps in at the Beam SQL
>>>>>>> level.
>>>>>>>
>>>>>>> There may be better ways to do this (I'm open to suggestions!).
>>>>>>> I've included a bunch of code snippets from my pipeline that I've tried to
>>>>>>> simplify, so apologies if there's anything obviously janky.  Here's what I
>>>>>>> previously had before the library upgrade:
>>>>>>>
>>>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>>>         "Generate Aggregates",
>>>>>>>
>>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>>>     )
>>>>>>>     .apply(ParDo.of(new HandleNullValues()));
>>>>>>>
>>>>>>> I've included the `HandleNullValues` definition at the bottom of
>>>>>>> this email, but it appears v2.21.0 introduced a breaking change whereby the
>>>>>>> coder inference was disabled for Beam Row types in [this ticket](
>>>>>>> https://issues.apache.org/jira/browse/BEAM-9569).  This change has
>>>>>>> caused the above code to fail with the following runtime error:
>>>>>>>
>>>>>>> > [ERROR] Failed to execute goal
>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>>> > Java class. Unable to return a default Coder for
>>>>>>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
>>>>>>> > [PCollection@83398426]. Correct one of the following root causes:
>>>>>>> > [ERROR]   No Coder has been manually specified;  you may do so
>>>>>>> using
>>>>>>> > .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
>>>>>>> > failed: Cannot provide a coder for a Beam Row. Please provide a
>>>>>>> schema
>>>>>>> > instead using PCollection.setRowSchema. [ERROR]   Using the default
>>>>>>> > output Coder from the producing PTransform failed:
>>>>>>> > PTransform.getOutputCoder called.
>>>>>>>
>>>>>>> I've followed the advice on the aforementioned JIRA ticket, plus a
>>>>>>> bunch of other examples I found online, but without much joy.  I've tried
>>>>>>> applying `setCoder(SerializableCoder.of(Row.class))` after the
>>>>>>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though
>>>>>>> I'm not yet sure if it's just suppressed the error, or if it's actually
>>>>>>> working), but that changes causes another runtime error:
>>>>>>>
>>>>>>> > [ERROR] Failed to execute goal
>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>>> > Java class. Cannot call getSchema when there is no schema -> [Help
>>>>>>> 1]
>>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>>>> > (default-cli) on project dataflow-example: An exception occured
>>>>>>> while
>>>>>>> > executing the Java class. Cannot call getSchema when there is no
>>>>>>> > schema
>>>>>>>
>>>>>>> This error is thrown further down my pipeline, when I perform a
>>>>>>> subsequent `SqlTransform.query` to JOIN some results together.
>>>>>>>
>>>>>>>     PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
>>>>>>>                     .and(new TupleTag<Row>("Experiments"),
>>>>>>> experiments)
>>>>>>>                         .apply("Joining Aggregates to Experiments",
>>>>>>> SqlTransform.query(aggregateExperimentJoin()))
>>>>>>>                         .apply(ParDo.of(new
>>>>>>> MapBeamRowsToBigQueryTableRows()))
>>>>>>>                         .apply(BigQueryIO.writeTableRows()
>>>>>>>
>>>>>>> .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>>>>>>>
>>>>>>> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>>>>>>>
>>>>>>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
>>>>>>> String>) projectId -> projectId + ":daily_aggregates.experiments")));
>>>>>>>
>>>>>>> I've verified the `aggregates` collection is indeed missing a schema
>>>>>>> if I interrogate the `hasSchema` property.  The second `experiments`
>>>>>>> PCollection above does have a row schema set though:
>>>>>>>
>>>>>>>     PCollection<Row> rawExperiments = rows.apply(
>>>>>>>         SqlTransform.query("select sessionId, experiments from
>>>>>>> PCOLLECTION")
>>>>>>>     );
>>>>>>>     PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
>>>>>>> CustomFunctions.ParseExperiments(bigQuerySchema)));
>>>>>>>     experiments.setRowSchema(bigQuerySchema);
>>>>>>>
>>>>>>> I've also tried applying this coder at the pipeline level, with
>>>>>>> different variations on the following.  But this also gives the same error:
>>>>>>>
>>>>>>>     CoderRegistry cr = pipeline.getCoderRegistry();
>>>>>>>     cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
>>>>>>>     cr.registerCoderForType(TypeDescriptors.rows(),
>>>>>>> RowCoder.of(bigQuerySchema));
>>>>>>>
>>>>>>> The `bigQuerySchema` object referenced above is the initial schema
>>>>>>> used to retrieve all raw data from BigQuery, though that part of the
>>>>>>> pipeline works fine, so potentially I need to pass the `aggregatesSchema`
>>>>>>> object (see below) in to `registerCoderForType` for the pipeline?
>>>>>>>
>>>>>>> I then tried to set the row schema on `aggregates` (which was
>>>>>>> another suggestion in the error above).  I've confirmed that calling
>>>>>>> `setCoder` is responsible for the previous `Row` schema disappearing, where
>>>>>>> it had previously been set by the input PCollection (and also if I call
>>>>>>> `setRowSchema` immediately before I call the `DoFn`.
>>>>>>>
>>>>>>> I've simplified the schema for succinctness in this post, but it's a
>>>>>>> subset of `bigQuerySchema` with a few new fields (simple data types).
>>>>>>> Here's what I've tried, again with various combinations of where I call
>>>>>>> `setCoder` and `setRowSchema` (before `apply()` and/or after).
>>>>>>>
>>>>>>>     Schema aggregatesSchema = Schema.builder()
>>>>>>>         .addNullableField("userId", FieldType.STRING)
>>>>>>>         .addNullableField("sessionId", FieldType.STRING)
>>>>>>>         .addNullableField("experimentsPerDay", FieldType.INT64)
>>>>>>>         .build();
>>>>>>>
>>>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>>>         "Generate Aggregates",
>>>>>>>
>>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>>>     )
>>>>>>>     .apply(ParDo.of(new HandleNullValues()))
>>>>>>>     .setCoder(SerializableCoder.of(Row.class))
>>>>>>>     .setRowSchema(aggregatesSchema);
>>>>>>>
>>>>>>> Unfortunately, this causes a third runtime error which I've not been
>>>>>>> able to figure out:
>>>>>>>
>>>>>>> > [ERROR] Failed to execute goal
>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>>>> > (default-cli) on project dataflow-example: An exception occured
>>>>>>> while
>>>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>>>>
>>>>>>> The full call stack is at the bottom of this email, and I can see it
>>>>>>> originating from my `HandleNullValues` `DoFn`, but after that it disappears
>>>>>>> into the Beam libraries.
>>>>>>>
>>>>>>> I'm at a loss as to which route is recommended, and how to proceed,
>>>>>>> as both coder and schema options are causing different issues.
>>>>>>>
>>>>>>> Any help would be greatly appreciated, and thanks for your efforts
>>>>>>> on this project!
>>>>>>>
>>>>>>>
>>>>>>> The full `DoFn` I've referred to is further below, but it's worth
>>>>>>> noting that just having an essentially empty `DoFn` with both input and
>>>>>>> output of Beam `Row` types causes the same issue:
>>>>>>>
>>>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>>>         @ProcessElement
>>>>>>>         public void processElement(ProcessContext c) {
>>>>>>>             Row row = c.element();
>>>>>>>             c.output(row);
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>> Here's the full implementation, if anyone can think of a better way
>>>>>>> to detect and replace `NULL` values returned from Beam SQL:
>>>>>>>
>>>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>>>         @ProcessElement
>>>>>>>         public void processElement(ProcessContext c) {
>>>>>>>             Row row = c.element();
>>>>>>>             List<String> fields = row.getSchema().getFieldNames();
>>>>>>>             Builder rowBuilder = Row.withSchema(row.getSchema());
>>>>>>>
>>>>>>>             for (String f: fields) {
>>>>>>>                 Object value = row.getValue(f);
>>>>>>>                 if (value != null && value instanceof Long) {
>>>>>>>                     Long longVal = row.getInt64(f);
>>>>>>>                     if (longVal == Long.MAX_VALUE || longVal ==
>>>>>>> Long.MIN_VALUE) {
>>>>>>>                         rowBuilder.addValue(null);
>>>>>>>                     } else {
>>>>>>>                         rowBuilder.addValue(value);
>>>>>>>                     }
>>>>>>>                 } else if (value != null && value instanceof Double)
>>>>>>> {
>>>>>>>                     Double doubleVal = row.getDouble(f);
>>>>>>>                     if (doubleVal == Double.MAX_VALUE || doubleVal
>>>>>>> == Double.MIN_VALUE) {
>>>>>>>                         rowBuilder.addValue(null);
>>>>>>>                     } else {
>>>>>>>                         rowBuilder.addValue(value);
>>>>>>>                     }
>>>>>>>                 } else {
>>>>>>>                     rowBuilder.addValue(value);
>>>>>>>                 }
>>>>>>>             }
>>>>>>>
>>>>>>>             Row newRow = rowBuilder.build();
>>>>>>>             c.output(newRow);
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>> And here's the full callstack from the `setRowSchema` issue detailed
>>>>>>> above:
>>>>>>>
>>>>>>>
>>>>>>> > [ERROR] Failed to execute goal
>>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>>>> > (default-cli) on project dataflow-example: An exception occured
>>>>>>> while
>>>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>>>> (MojoExecutor.java:306)
>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>> (MojoExecutor.java:211)
>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>> (MojoExecutor.java:165)
>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>> (MojoExecutor.java:157)
>>>>>>> >     at
>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>>> > (LifecycleModuleBuilder.java:121)
>>>>>>> >     at
>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>>> > (LifecycleModuleBuilder.java:81)
>>>>>>> >     at
>>>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>>>> > (SingleThreadedBuilder.java:56)
>>>>>>> >     at
>>>>>>> org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>>>> (LifecycleStarter.java:127)
>>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>>> (DefaultMaven.java:294)
>>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>>> (DefaultMaven.java:192)
>>>>>>> >     at org.apache.maven.DefaultMaven.execute
>>>>>>> (DefaultMaven.java:105)
>>>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>>>> (NativeMethodAccessorImpl.java:62)
>>>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>>>> >     at
>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>>>> > (Launcher.java:282)
>>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>>>> (Launcher.java:225)
>>>>>>> >     at
>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>>>> > (Launcher.java:406)
>>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>>>> (Launcher.java:347) Caused by:
>>>>>>> > org.apache.maven.plugin.MojoExecutionException: An exception
>>>>>>> occured
>>>>>>> > while executing the Java class. java.lang.IllegalStateException
>>>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo.execute
>>>>>>> (ExecJavaMojo.java:311)
>>>>>>> >     at
>>>>>>> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
>>>>>>> (DefaultBuildPluginManager.java:137)
>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>>>> (MojoExecutor.java:301)
>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>> (MojoExecutor.java:211)
>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>> (MojoExecutor.java:165)
>>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>>> (MojoExecutor.java:157)
>>>>>>> >     at
>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>>> > (LifecycleModuleBuilder.java:121)
>>>>>>> >     at
>>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>>> > (LifecycleModuleBuilder.java:81)
>>>>>>> >     at
>>>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>>>> > (SingleThreadedBuilder.java:56)
>>>>>>> >     at
>>>>>>> org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>>>> (LifecycleStarter.java:127)
>>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>>> (DefaultMaven.java:294)
>>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>>> (DefaultMaven.java:192)
>>>>>>> >     at org.apache.maven.DefaultMaven.execute
>>>>>>> (DefaultMaven.java:105)
>>>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>>>> (NativeMethodAccessorImpl.java:62)
>>>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>>>> >     at
>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>>>> > (Launcher.java:282)
>>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>>>> (Launcher.java:225)
>>>>>>> >     at
>>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>>>> > (Launcher.java:406)
>>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>>>> (Launcher.java:347) Caused by:
>>>>>>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>>> > java.lang.IllegalStateException
>>>>>>> >     at
>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>>>> > (DirectRunner.java:373)
>>>>>>> >     at
>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>>>> > (DirectRunner.java:341)
>>>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>>>> (DirectRunner.java:218)
>>>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>>>> (DirectRunner.java:67)
>>>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>>>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>>>>>>> >     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>>>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run
>>>>>>> (ExecJavaMojo.java:254)
>>>>>>> >     at java.lang.Thread.run (Thread.java:748) Caused by:
>>>>>>> java.lang.IllegalStateException
>>>>>>> >     at
>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>>>> > (Preconditions.java:491)
>>>>>>> >     at
>>>>>>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
>>>>>>> > (RowCoderGenerator.java:314)
>>>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>>>> (Unknown Source)
>>>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>>>> (Unknown Source)
>>>>>>> >     at org.apache.beam.sdk.schemas.SchemaCoder.encode
>>>>>>> (SchemaCoder.java:124)
>>>>>>> >     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
>>>>>>> (CoderUtils.java:85)
>>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>>>> (CoderUtils.java:69)
>>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>>>> (CoderUtils.java:54)
>>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.clone
>>>>>>> (CoderUtils.java:144)
>>>>>>> >     at
>>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
>>>>>>> > (MutationDetectors.java:118)
>>>>>>> >     at
>>>>>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
>>>>>>> (MutationDetectors.java:49)
>>>>>>> >     at
>>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
>>>>>>> > (ImmutabilityCheckingBundleFactory.java:115)
>>>>>>> >     at
>>>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
>>>>>>> > (ParDoEvaluator.java:305)
>>>>>>> >     at
>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
>>>>>>> > (SimpleDoFnRunner.java:268)
>>>>>>> >     at
>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
>>>>>>> > (SimpleDoFnRunner.java:84)
>>>>>>> >     at
>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>>>> > (SimpleDoFnRunner.java:416)
>>>>>>> >     at
>>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>>>> > (SimpleDoFnRunner.java:404)
>>>>>>> >     at
>>>>>>> com.example.dataflow.Pipeline$HandleNullValues.processElement
>>>>>>> (CustomFunctions.java:310)
>>>>>>>
>>>>>>>
>>>>>>> Cheers!
>>>>>>>
>>>>>>

Re: [Question] Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

Posted by Jimmy Headdon <ji...@mojiworks.com>.
Hi Brian

This is the shortest pipeline Gist I can come up with to demonstrate
"java.lang.IllegalStateException: Cannot call getSchema when there is no
schema".  You'll see I've tried setRowSchema and setCoder, but with the
same end result.  Any chance you can advise where I'm going wrong, as I
wanted to setup a simple pipeline for Andrew on the NULL aggregation
results.

https://gist.github.com/jimmyheaddon/f0350a29f69c745e31c442942874eb12


Thanks again

On Sat, 16 Apr 2022 at 09:08, Jimmy Headdon <ji...@mojiworks.com>
wrote:

> Thanks Andrew, out of interest does your test pass if all of the input
> values to the MIN/MAX aggregates are NULL?  If I have a BigQuery column
> that contains entirely NULL values, and I then convert them from TableRow
> to Row types, they're still showing correctly as NULL if I perform a simple
> System.out.printLn:
>
> rowsFromBigQuery.apply(ParDo.of(new
> PrintParDo())).setCoder(SerializableCoder.of(Row.class));
> "NULL"
> "NULL"
> "NULL"....
>
> If I then apply the following Beam SQL on this PCollection:
>
> select max(experimentValue) as experimentValue
> from PCOLLECTION
>
> Then the results come back as -9223372036854775808, or 9223372036854775807
> if you use MIN().
>
> Hopefully I'm doing something silly and it's an easy fix, let me know if
> there's anything you'd like me to try.
>
>
> Cheers
>
>
> On Fri, 15 Apr 2022 at 21:20, Andrew Pilloud <ap...@google.com> wrote:
>
>> Are you sure the min/max values are coming from SqlTransform? I wrote a
>> quick test in Beam (using Integer, but all types have the same null
>> wrapper) and the nulls were dropped.
>>
>> More detail: I added the following test case
>> to BeamSqlDslAggregationNullableTest on the latest Beam. The input values
>> for f_int1 are 1, null, 2, null, null, null, 3. The test passed, (the
>> return value being 1) which indicates we are dropping nulls before
>> aggregation. (I don't believe this is actually correct behavior, we should
>> be returning null?)
>>
>>  @Test
>>  public void testMin() {
>>    String sql = "SELECT min(f_int1) FROM PCOLLECTION";
>>
>>  PAssert.that(boundedInput.apply(SqlTransform.query(sql))).satisfies(matchesScalar(1));
>>    pipeline.run();
>>  }
>>
>> On Fri, Apr 15, 2022 at 11:37 AM Jimmy Headdon <
>> jimmy.headdon@mojiworks.com> wrote:
>>
>>> Thanks for the swift response Brian, Andrew.  I've tried your suggestion
>>> Brian, and sadly I get the same error as the lengthy call stack from the
>>> end of my original post (IllegalStateException) - it appears the
>>> PCollection might have been finalised my the DoFn, and therefore I cannot
>>> setRowSchema against it?  In the fully implemented version I captured in my
>>> original post you can see I call withSchema when creating the Row objects,
>>> though interestingly the cutdown version I also posted gives the same
>>> error, even though it's passing the input row to the output without
>>> mutating it?
>>>
>>> Regarding the NULL values from Beam SQL aggregations, I've re-run my
>>> pipeline with my NullValueHandler commented out, and unfortunately I can
>>> still see min and max integers being written back to BigQuery.  Is there
>>> anything you'd like me to test to get you some further feedback?
>>>
>>> Thanks again!
>>>
>>> On Fri, 15 Apr 2022 at 18:37, Andrew Pilloud <ap...@google.com>
>>> wrote:
>>>
>>>> Beam SQL's null aggregation behavior changed radically in 2.34.0.
>>>> https://github.com/apache/beam/pull/15174
>>>>
>>>> I think we drop them now?
>>>>
>>>> Andrew
>>>>
>>>>
>>>> On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <bh...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Jimmy,
>>>>>
>>>>> Sorry about this, I wonder if this error message could be more helpful?
>>>>> You're right that the issue is that the output PCollection produced by
>>>>> HandleNullValues doesn't have a schema attached to it. Beam has no way of
>>>>> inferring the output schema through the opaque DoFn. A quick solution might
>>>>> be to just propagate the schema from the SQL output:
>>>>>
>>>>>     PCollection<Row> sqlOutput = inputCollection.apply(
>>>>>         "Generate Aggregates",
>>>>>
>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>     )
>>>>>
>>>>>     PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new
>>>>> HandleNullValues())).setRowSchema(inputCollection.getSchema())
>>>>>
>>>>> @Reuven Lax <re...@google.com> may have some other ideas.
>>>>>
>>>>> Stepping back to the reason you need to add HandleNullValues: "I call
>>>>> this function within `ParDo.of` to detect `Double.MAX_VALUE` and
>>>>> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL
>>>>> returns the Double min/max values when it encounters a `NULL` value, rather
>>>>> than just returning NULL."
>>>>> @Andrew Pilloud <ap...@google.com> is this intended? Do you know
>>>>> if there's any way to modify this behavior?
>>>>>
>>>>> Brian
>>>>>
>>>>> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon <
>>>>> jimmy.headdon@mojiworks.com> wrote:
>>>>>
>>>>>> Hello
>>>>>>
>>>>>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to
>>>>>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change
>>>>>> that I'd appreciate some support with.  Sorry this is quite a long one, I
>>>>>> wanted to capture as much context as I could, but please shout if there's
>>>>>> anything you'd like to dig into.
>>>>>>
>>>>>> I'd note that I've also raised this on StackOverflow, if you find it
>>>>>> easier to read the Markdown there -
>>>>>> https://stackoverflow.com/q/71875593/18805546.
>>>>>>
>>>>>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then
>>>>>> processing aggregates before I write the results back to BigQuery.  I'm
>>>>>> able to read from/write to BigQuery without issue, but after the upgrade my
>>>>>> pipeline to calculate aggregates is failing at runtime, specifically a
>>>>>> `DoFn` I have written to sanitise the results returned from the Beam
>>>>>> `SqlTransform.query` command.  I call this function within `ParDo.of` to
>>>>>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
>>>>>> aggregates in Beam SQL returns the Double min/max values when it encounters
>>>>>> a `NULL` value, rather than just returning NULL.  I did try filtering the
>>>>>> initial BigQuery raw data results, but this issue creeps in at the Beam SQL
>>>>>> level.
>>>>>>
>>>>>> There may be better ways to do this (I'm open to suggestions!).  I've
>>>>>> included a bunch of code snippets from my pipeline that I've tried to
>>>>>> simplify, so apologies if there's anything obviously janky.  Here's what I
>>>>>> previously had before the library upgrade:
>>>>>>
>>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>>         "Generate Aggregates",
>>>>>>
>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>>     )
>>>>>>     .apply(ParDo.of(new HandleNullValues()));
>>>>>>
>>>>>> I've included the `HandleNullValues` definition at the bottom of this
>>>>>> email, but it appears v2.21.0 introduced a breaking change whereby the
>>>>>> coder inference was disabled for Beam Row types in [this ticket](
>>>>>> https://issues.apache.org/jira/browse/BEAM-9569).  This change has
>>>>>> caused the above code to fail with the following runtime error:
>>>>>>
>>>>>> > [ERROR] Failed to execute goal
>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>> > Java class. Unable to return a default Coder for
>>>>>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
>>>>>> > [PCollection@83398426]. Correct one of the following root causes:
>>>>>> > [ERROR]   No Coder has been manually specified;  you may do so using
>>>>>> > .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
>>>>>> > failed: Cannot provide a coder for a Beam Row. Please provide a
>>>>>> schema
>>>>>> > instead using PCollection.setRowSchema. [ERROR]   Using the default
>>>>>> > output Coder from the producing PTransform failed:
>>>>>> > PTransform.getOutputCoder called.
>>>>>>
>>>>>> I've followed the advice on the aforementioned JIRA ticket, plus a
>>>>>> bunch of other examples I found online, but without much joy.  I've tried
>>>>>> applying `setCoder(SerializableCoder.of(Row.class))` after the
>>>>>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though
>>>>>> I'm not yet sure if it's just suppressed the error, or if it's actually
>>>>>> working), but that changes causes another runtime error:
>>>>>>
>>>>>> > [ERROR] Failed to execute goal
>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>> > Java class. Cannot call getSchema when there is no schema -> [Help
>>>>>> 1]
>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>>> > (default-cli) on project dataflow-example: An exception occured
>>>>>> while
>>>>>> > executing the Java class. Cannot call getSchema when there is no
>>>>>> > schema
>>>>>>
>>>>>> This error is thrown further down my pipeline, when I perform a
>>>>>> subsequent `SqlTransform.query` to JOIN some results together.
>>>>>>
>>>>>>     PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
>>>>>>                     .and(new TupleTag<Row>("Experiments"),
>>>>>> experiments)
>>>>>>                         .apply("Joining Aggregates to Experiments",
>>>>>> SqlTransform.query(aggregateExperimentJoin()))
>>>>>>                         .apply(ParDo.of(new
>>>>>> MapBeamRowsToBigQueryTableRows()))
>>>>>>                         .apply(BigQueryIO.writeTableRows()
>>>>>>
>>>>>> .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>>>>>>
>>>>>> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>>>>>>
>>>>>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
>>>>>> String>) projectId -> projectId + ":daily_aggregates.experiments")));
>>>>>>
>>>>>> I've verified the `aggregates` collection is indeed missing a schema
>>>>>> if I interrogate the `hasSchema` property.  The second `experiments`
>>>>>> PCollection above does have a row schema set though:
>>>>>>
>>>>>>     PCollection<Row> rawExperiments = rows.apply(
>>>>>>         SqlTransform.query("select sessionId, experiments from
>>>>>> PCOLLECTION")
>>>>>>     );
>>>>>>     PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
>>>>>> CustomFunctions.ParseExperiments(bigQuerySchema)));
>>>>>>     experiments.setRowSchema(bigQuerySchema);
>>>>>>
>>>>>> I've also tried applying this coder at the pipeline level, with
>>>>>> different variations on the following.  But this also gives the same error:
>>>>>>
>>>>>>     CoderRegistry cr = pipeline.getCoderRegistry();
>>>>>>     cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
>>>>>>     cr.registerCoderForType(TypeDescriptors.rows(),
>>>>>> RowCoder.of(bigQuerySchema));
>>>>>>
>>>>>> The `bigQuerySchema` object referenced above is the initial schema
>>>>>> used to retrieve all raw data from BigQuery, though that part of the
>>>>>> pipeline works fine, so potentially I need to pass the `aggregatesSchema`
>>>>>> object (see below) in to `registerCoderForType` for the pipeline?
>>>>>>
>>>>>> I then tried to set the row schema on `aggregates` (which was another
>>>>>> suggestion in the error above).  I've confirmed that calling `setCoder` is
>>>>>> responsible for the previous `Row` schema disappearing, where it had
>>>>>> previously been set by the input PCollection (and also if I call
>>>>>> `setRowSchema` immediately before I call the `DoFn`.
>>>>>>
>>>>>> I've simplified the schema for succinctness in this post, but it's a
>>>>>> subset of `bigQuerySchema` with a few new fields (simple data types).
>>>>>> Here's what I've tried, again with various combinations of where I call
>>>>>> `setCoder` and `setRowSchema` (before `apply()` and/or after).
>>>>>>
>>>>>>     Schema aggregatesSchema = Schema.builder()
>>>>>>         .addNullableField("userId", FieldType.STRING)
>>>>>>         .addNullableField("sessionId", FieldType.STRING)
>>>>>>         .addNullableField("experimentsPerDay", FieldType.INT64)
>>>>>>         .build();
>>>>>>
>>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>>         "Generate Aggregates",
>>>>>>
>>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>>     )
>>>>>>     .apply(ParDo.of(new HandleNullValues()))
>>>>>>     .setCoder(SerializableCoder.of(Row.class))
>>>>>>     .setRowSchema(aggregatesSchema);
>>>>>>
>>>>>> Unfortunately, this causes a third runtime error which I've not been
>>>>>> able to figure out:
>>>>>>
>>>>>> > [ERROR] Failed to execute goal
>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>>> > (default-cli) on project dataflow-example: An exception occured
>>>>>> while
>>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>>>
>>>>>> The full call stack is at the bottom of this email, and I can see it
>>>>>> originating from my `HandleNullValues` `DoFn`, but after that it disappears
>>>>>> into the Beam libraries.
>>>>>>
>>>>>> I'm at a loss as to which route is recommended, and how to proceed,
>>>>>> as both coder and schema options are causing different issues.
>>>>>>
>>>>>> Any help would be greatly appreciated, and thanks for your efforts on
>>>>>> this project!
>>>>>>
>>>>>>
>>>>>> The full `DoFn` I've referred to is further below, but it's worth
>>>>>> noting that just having an essentially empty `DoFn` with both input and
>>>>>> output of Beam `Row` types causes the same issue:
>>>>>>
>>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>>         @ProcessElement
>>>>>>         public void processElement(ProcessContext c) {
>>>>>>             Row row = c.element();
>>>>>>             c.output(row);
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>> Here's the full implementation, if anyone can think of a better way
>>>>>> to detect and replace `NULL` values returned from Beam SQL:
>>>>>>
>>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>>         @ProcessElement
>>>>>>         public void processElement(ProcessContext c) {
>>>>>>             Row row = c.element();
>>>>>>             List<String> fields = row.getSchema().getFieldNames();
>>>>>>             Builder rowBuilder = Row.withSchema(row.getSchema());
>>>>>>
>>>>>>             for (String f: fields) {
>>>>>>                 Object value = row.getValue(f);
>>>>>>                 if (value != null && value instanceof Long) {
>>>>>>                     Long longVal = row.getInt64(f);
>>>>>>                     if (longVal == Long.MAX_VALUE || longVal ==
>>>>>> Long.MIN_VALUE) {
>>>>>>                         rowBuilder.addValue(null);
>>>>>>                     } else {
>>>>>>                         rowBuilder.addValue(value);
>>>>>>                     }
>>>>>>                 } else if (value != null && value instanceof Double) {
>>>>>>                     Double doubleVal = row.getDouble(f);
>>>>>>                     if (doubleVal == Double.MAX_VALUE || doubleVal ==
>>>>>> Double.MIN_VALUE) {
>>>>>>                         rowBuilder.addValue(null);
>>>>>>                     } else {
>>>>>>                         rowBuilder.addValue(value);
>>>>>>                     }
>>>>>>                 } else {
>>>>>>                     rowBuilder.addValue(value);
>>>>>>                 }
>>>>>>             }
>>>>>>
>>>>>>             Row newRow = rowBuilder.build();
>>>>>>             c.output(newRow);
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>> And here's the full callstack from the `setRowSchema` issue detailed
>>>>>> above:
>>>>>>
>>>>>>
>>>>>> > [ERROR] Failed to execute goal
>>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>>> > project dataflow-example: An exception occured while executing the
>>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>>> > (default-cli) on project dataflow-example: An exception occured
>>>>>> while
>>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>>> (MojoExecutor.java:306)
>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>> (MojoExecutor.java:211)
>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>> (MojoExecutor.java:165)
>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>> (MojoExecutor.java:157)
>>>>>> >     at
>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>> > (LifecycleModuleBuilder.java:121)
>>>>>> >     at
>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>> > (LifecycleModuleBuilder.java:81)
>>>>>> >     at
>>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>>> > (SingleThreadedBuilder.java:56)
>>>>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>>> (LifecycleStarter.java:127)
>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>> (DefaultMaven.java:294)
>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>> (DefaultMaven.java:192)
>>>>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>>> (NativeMethodAccessorImpl.java:62)
>>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>>> >     at
>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>>> > (Launcher.java:282)
>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>>> (Launcher.java:225)
>>>>>> >     at
>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>>> > (Launcher.java:406)
>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>>> (Launcher.java:347) Caused by:
>>>>>> > org.apache.maven.plugin.MojoExecutionException: An exception occured
>>>>>> > while executing the Java class. java.lang.IllegalStateException
>>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo.execute
>>>>>> (ExecJavaMojo.java:311)
>>>>>> >     at
>>>>>> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
>>>>>> (DefaultBuildPluginManager.java:137)
>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>>> (MojoExecutor.java:301)
>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>> (MojoExecutor.java:211)
>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>> (MojoExecutor.java:165)
>>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>>> (MojoExecutor.java:157)
>>>>>> >     at
>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>> > (LifecycleModuleBuilder.java:121)
>>>>>> >     at
>>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>>> > (LifecycleModuleBuilder.java:81)
>>>>>> >     at
>>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>>> > (SingleThreadedBuilder.java:56)
>>>>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>>> (LifecycleStarter.java:127)
>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>> (DefaultMaven.java:294)
>>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>>> (DefaultMaven.java:192)
>>>>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>>> (NativeMethodAccessorImpl.java:62)
>>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>>> >     at
>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>>> > (Launcher.java:282)
>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>>> (Launcher.java:225)
>>>>>> >     at
>>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>>> > (Launcher.java:406)
>>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>>> (Launcher.java:347) Caused by:
>>>>>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>> > java.lang.IllegalStateException
>>>>>> >     at
>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>>> > (DirectRunner.java:373)
>>>>>> >     at
>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>>> > (DirectRunner.java:341)
>>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>>> (DirectRunner.java:218)
>>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>>> (DirectRunner.java:67)
>>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>>>>>> >     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run
>>>>>> (ExecJavaMojo.java:254)
>>>>>> >     at java.lang.Thread.run (Thread.java:748) Caused by:
>>>>>> java.lang.IllegalStateException
>>>>>> >     at
>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>>> > (Preconditions.java:491)
>>>>>> >     at
>>>>>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
>>>>>> > (RowCoderGenerator.java:314)
>>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>>> (Unknown Source)
>>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>>> (Unknown Source)
>>>>>> >     at org.apache.beam.sdk.schemas.SchemaCoder.encode
>>>>>> (SchemaCoder.java:124)
>>>>>> >     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
>>>>>> (CoderUtils.java:85)
>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>>> (CoderUtils.java:69)
>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>>> (CoderUtils.java:54)
>>>>>> >     at org.apache.beam.sdk.util.CoderUtils.clone
>>>>>> (CoderUtils.java:144)
>>>>>> >     at
>>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
>>>>>> > (MutationDetectors.java:118)
>>>>>> >     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
>>>>>> (MutationDetectors.java:49)
>>>>>> >     at
>>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
>>>>>> > (ImmutabilityCheckingBundleFactory.java:115)
>>>>>> >     at
>>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
>>>>>> > (ParDoEvaluator.java:305)
>>>>>> >     at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
>>>>>> > (SimpleDoFnRunner.java:268)
>>>>>> >     at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
>>>>>> > (SimpleDoFnRunner.java:84)
>>>>>> >     at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>>> > (SimpleDoFnRunner.java:416)
>>>>>> >     at
>>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>>> > (SimpleDoFnRunner.java:404)
>>>>>> >     at
>>>>>> com.example.dataflow.Pipeline$HandleNullValues.processElement
>>>>>> (CustomFunctions.java:310)
>>>>>>
>>>>>>
>>>>>> Cheers!
>>>>>>
>>>>>

Re: [Question] Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

Posted by Jimmy Headdon <ji...@mojiworks.com>.
Thanks Andrew, out of interest does your test pass if all of the input
values to the MIN/MAX aggregates are NULL?  If I have a BigQuery column
that contains entirely NULL values, and I then convert them from TableRow
to Row types, they're still showing correctly as NULL if I perform a simple
System.out.printLn:

rowsFromBigQuery.apply(ParDo.of(new
PrintParDo())).setCoder(SerializableCoder.of(Row.class));
"NULL"
"NULL"
"NULL"....

If I then apply the following Beam SQL on this PCollection:

select max(experimentValue) as experimentValue
from PCOLLECTION

Then the results come back as -9223372036854775808, or 9223372036854775807
if you use MIN().

Hopefully I'm doing something silly and it's an easy fix, let me know if
there's anything you'd like me to try.


Cheers


On Fri, 15 Apr 2022 at 21:20, Andrew Pilloud <ap...@google.com> wrote:

> Are you sure the min/max values are coming from SqlTransform? I wrote a
> quick test in Beam (using Integer, but all types have the same null
> wrapper) and the nulls were dropped.
>
> More detail: I added the following test case
> to BeamSqlDslAggregationNullableTest on the latest Beam. The input values
> for f_int1 are 1, null, 2, null, null, null, 3. The test passed, (the
> return value being 1) which indicates we are dropping nulls before
> aggregation. (I don't believe this is actually correct behavior, we should
> be returning null?)
>
>  @Test
>  public void testMin() {
>    String sql = "SELECT min(f_int1) FROM PCOLLECTION";
>
>  PAssert.that(boundedInput.apply(SqlTransform.query(sql))).satisfies(matchesScalar(1));
>    pipeline.run();
>  }
>
> On Fri, Apr 15, 2022 at 11:37 AM Jimmy Headdon <
> jimmy.headdon@mojiworks.com> wrote:
>
>> Thanks for the swift response Brian, Andrew.  I've tried your suggestion
>> Brian, and sadly I get the same error as the lengthy call stack from the
>> end of my original post (IllegalStateException) - it appears the
>> PCollection might have been finalised my the DoFn, and therefore I cannot
>> setRowSchema against it?  In the fully implemented version I captured in my
>> original post you can see I call withSchema when creating the Row objects,
>> though interestingly the cutdown version I also posted gives the same
>> error, even though it's passing the input row to the output without
>> mutating it?
>>
>> Regarding the NULL values from Beam SQL aggregations, I've re-run my
>> pipeline with my NullValueHandler commented out, and unfortunately I can
>> still see min and max integers being written back to BigQuery.  Is there
>> anything you'd like me to test to get you some further feedback?
>>
>> Thanks again!
>>
>> On Fri, 15 Apr 2022 at 18:37, Andrew Pilloud <ap...@google.com> wrote:
>>
>>> Beam SQL's null aggregation behavior changed radically in 2.34.0.
>>> https://github.com/apache/beam/pull/15174
>>>
>>> I think we drop them now?
>>>
>>> Andrew
>>>
>>>
>>> On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <bh...@google.com>
>>> wrote:
>>>
>>>> Hi Jimmy,
>>>>
>>>> Sorry about this, I wonder if this error message could be more helpful?
>>>> You're right that the issue is that the output PCollection produced by
>>>> HandleNullValues doesn't have a schema attached to it. Beam has no way of
>>>> inferring the output schema through the opaque DoFn. A quick solution might
>>>> be to just propagate the schema from the SQL output:
>>>>
>>>>     PCollection<Row> sqlOutput = inputCollection.apply(
>>>>         "Generate Aggregates",
>>>>
>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>     )
>>>>
>>>>     PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new
>>>> HandleNullValues())).setRowSchema(inputCollection.getSchema())
>>>>
>>>> @Reuven Lax <re...@google.com> may have some other ideas.
>>>>
>>>> Stepping back to the reason you need to add HandleNullValues: "I call
>>>> this function within `ParDo.of` to detect `Double.MAX_VALUE` and
>>>> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL
>>>> returns the Double min/max values when it encounters a `NULL` value, rather
>>>> than just returning NULL."
>>>> @Andrew Pilloud <ap...@google.com> is this intended? Do you know if
>>>> there's any way to modify this behavior?
>>>>
>>>> Brian
>>>>
>>>> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon <
>>>> jimmy.headdon@mojiworks.com> wrote:
>>>>
>>>>> Hello
>>>>>
>>>>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to
>>>>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change
>>>>> that I'd appreciate some support with.  Sorry this is quite a long one, I
>>>>> wanted to capture as much context as I could, but please shout if there's
>>>>> anything you'd like to dig into.
>>>>>
>>>>> I'd note that I've also raised this on StackOverflow, if you find it
>>>>> easier to read the Markdown there -
>>>>> https://stackoverflow.com/q/71875593/18805546.
>>>>>
>>>>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then
>>>>> processing aggregates before I write the results back to BigQuery.  I'm
>>>>> able to read from/write to BigQuery without issue, but after the upgrade my
>>>>> pipeline to calculate aggregates is failing at runtime, specifically a
>>>>> `DoFn` I have written to sanitise the results returned from the Beam
>>>>> `SqlTransform.query` command.  I call this function within `ParDo.of` to
>>>>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
>>>>> aggregates in Beam SQL returns the Double min/max values when it encounters
>>>>> a `NULL` value, rather than just returning NULL.  I did try filtering the
>>>>> initial BigQuery raw data results, but this issue creeps in at the Beam SQL
>>>>> level.
>>>>>
>>>>> There may be better ways to do this (I'm open to suggestions!).  I've
>>>>> included a bunch of code snippets from my pipeline that I've tried to
>>>>> simplify, so apologies if there's anything obviously janky.  Here's what I
>>>>> previously had before the library upgrade:
>>>>>
>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>         "Generate Aggregates",
>>>>>
>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>     )
>>>>>     .apply(ParDo.of(new HandleNullValues()));
>>>>>
>>>>> I've included the `HandleNullValues` definition at the bottom of this
>>>>> email, but it appears v2.21.0 introduced a breaking change whereby the
>>>>> coder inference was disabled for Beam Row types in [this ticket](
>>>>> https://issues.apache.org/jira/browse/BEAM-9569).  This change has
>>>>> caused the above code to fail with the following runtime error:
>>>>>
>>>>> > [ERROR] Failed to execute goal
>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>> > project dataflow-example: An exception occured while executing the
>>>>> > Java class. Unable to return a default Coder for
>>>>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
>>>>> > [PCollection@83398426]. Correct one of the following root causes:
>>>>> > [ERROR]   No Coder has been manually specified;  you may do so using
>>>>> > .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
>>>>> > failed: Cannot provide a coder for a Beam Row. Please provide a
>>>>> schema
>>>>> > instead using PCollection.setRowSchema. [ERROR]   Using the default
>>>>> > output Coder from the producing PTransform failed:
>>>>> > PTransform.getOutputCoder called.
>>>>>
>>>>> I've followed the advice on the aforementioned JIRA ticket, plus a
>>>>> bunch of other examples I found online, but without much joy.  I've tried
>>>>> applying `setCoder(SerializableCoder.of(Row.class))` after the
>>>>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though
>>>>> I'm not yet sure if it's just suppressed the error, or if it's actually
>>>>> working), but that changes causes another runtime error:
>>>>>
>>>>> > [ERROR] Failed to execute goal
>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>> > project dataflow-example: An exception occured while executing the
>>>>> > Java class. Cannot call getSchema when there is no schema -> [Help 1]
>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>> > (default-cli) on project dataflow-example: An exception occured while
>>>>> > executing the Java class. Cannot call getSchema when there is no
>>>>> > schema
>>>>>
>>>>> This error is thrown further down my pipeline, when I perform a
>>>>> subsequent `SqlTransform.query` to JOIN some results together.
>>>>>
>>>>>     PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
>>>>>                     .and(new TupleTag<Row>("Experiments"), experiments)
>>>>>                         .apply("Joining Aggregates to Experiments",
>>>>> SqlTransform.query(aggregateExperimentJoin()))
>>>>>                         .apply(ParDo.of(new
>>>>> MapBeamRowsToBigQueryTableRows()))
>>>>>                         .apply(BigQueryIO.writeTableRows()
>>>>>
>>>>> .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>>>>>
>>>>> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>>>>>
>>>>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
>>>>> String>) projectId -> projectId + ":daily_aggregates.experiments")));
>>>>>
>>>>> I've verified the `aggregates` collection is indeed missing a schema
>>>>> if I interrogate the `hasSchema` property.  The second `experiments`
>>>>> PCollection above does have a row schema set though:
>>>>>
>>>>>     PCollection<Row> rawExperiments = rows.apply(
>>>>>         SqlTransform.query("select sessionId, experiments from
>>>>> PCOLLECTION")
>>>>>     );
>>>>>     PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
>>>>> CustomFunctions.ParseExperiments(bigQuerySchema)));
>>>>>     experiments.setRowSchema(bigQuerySchema);
>>>>>
>>>>> I've also tried applying this coder at the pipeline level, with
>>>>> different variations on the following.  But this also gives the same error:
>>>>>
>>>>>     CoderRegistry cr = pipeline.getCoderRegistry();
>>>>>     cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
>>>>>     cr.registerCoderForType(TypeDescriptors.rows(),
>>>>> RowCoder.of(bigQuerySchema));
>>>>>
>>>>> The `bigQuerySchema` object referenced above is the initial schema
>>>>> used to retrieve all raw data from BigQuery, though that part of the
>>>>> pipeline works fine, so potentially I need to pass the `aggregatesSchema`
>>>>> object (see below) in to `registerCoderForType` for the pipeline?
>>>>>
>>>>> I then tried to set the row schema on `aggregates` (which was another
>>>>> suggestion in the error above).  I've confirmed that calling `setCoder` is
>>>>> responsible for the previous `Row` schema disappearing, where it had
>>>>> previously been set by the input PCollection (and also if I call
>>>>> `setRowSchema` immediately before I call the `DoFn`.
>>>>>
>>>>> I've simplified the schema for succinctness in this post, but it's a
>>>>> subset of `bigQuerySchema` with a few new fields (simple data types).
>>>>> Here's what I've tried, again with various combinations of where I call
>>>>> `setCoder` and `setRowSchema` (before `apply()` and/or after).
>>>>>
>>>>>     Schema aggregatesSchema = Schema.builder()
>>>>>         .addNullableField("userId", FieldType.STRING)
>>>>>         .addNullableField("sessionId", FieldType.STRING)
>>>>>         .addNullableField("experimentsPerDay", FieldType.INT64)
>>>>>         .build();
>>>>>
>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>         "Generate Aggregates",
>>>>>
>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>     )
>>>>>     .apply(ParDo.of(new HandleNullValues()))
>>>>>     .setCoder(SerializableCoder.of(Row.class))
>>>>>     .setRowSchema(aggregatesSchema);
>>>>>
>>>>> Unfortunately, this causes a third runtime error which I've not been
>>>>> able to figure out:
>>>>>
>>>>> > [ERROR] Failed to execute goal
>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>> > project dataflow-example: An exception occured while executing the
>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>> > (default-cli) on project dataflow-example: An exception occured while
>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>>
>>>>> The full call stack is at the bottom of this email, and I can see it
>>>>> originating from my `HandleNullValues` `DoFn`, but after that it disappears
>>>>> into the Beam libraries.
>>>>>
>>>>> I'm at a loss as to which route is recommended, and how to proceed, as
>>>>> both coder and schema options are causing different issues.
>>>>>
>>>>> Any help would be greatly appreciated, and thanks for your efforts on
>>>>> this project!
>>>>>
>>>>>
>>>>> The full `DoFn` I've referred to is further below, but it's worth
>>>>> noting that just having an essentially empty `DoFn` with both input and
>>>>> output of Beam `Row` types causes the same issue:
>>>>>
>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>         @ProcessElement
>>>>>         public void processElement(ProcessContext c) {
>>>>>             Row row = c.element();
>>>>>             c.output(row);
>>>>>         }
>>>>>     }
>>>>>
>>>>> Here's the full implementation, if anyone can think of a better way to
>>>>> detect and replace `NULL` values returned from Beam SQL:
>>>>>
>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>         @ProcessElement
>>>>>         public void processElement(ProcessContext c) {
>>>>>             Row row = c.element();
>>>>>             List<String> fields = row.getSchema().getFieldNames();
>>>>>             Builder rowBuilder = Row.withSchema(row.getSchema());
>>>>>
>>>>>             for (String f: fields) {
>>>>>                 Object value = row.getValue(f);
>>>>>                 if (value != null && value instanceof Long) {
>>>>>                     Long longVal = row.getInt64(f);
>>>>>                     if (longVal == Long.MAX_VALUE || longVal ==
>>>>> Long.MIN_VALUE) {
>>>>>                         rowBuilder.addValue(null);
>>>>>                     } else {
>>>>>                         rowBuilder.addValue(value);
>>>>>                     }
>>>>>                 } else if (value != null && value instanceof Double) {
>>>>>                     Double doubleVal = row.getDouble(f);
>>>>>                     if (doubleVal == Double.MAX_VALUE || doubleVal ==
>>>>> Double.MIN_VALUE) {
>>>>>                         rowBuilder.addValue(null);
>>>>>                     } else {
>>>>>                         rowBuilder.addValue(value);
>>>>>                     }
>>>>>                 } else {
>>>>>                     rowBuilder.addValue(value);
>>>>>                 }
>>>>>             }
>>>>>
>>>>>             Row newRow = rowBuilder.build();
>>>>>             c.output(newRow);
>>>>>         }
>>>>>     }
>>>>>
>>>>> And here's the full callstack from the `setRowSchema` issue detailed
>>>>> above:
>>>>>
>>>>>
>>>>> > [ERROR] Failed to execute goal
>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>> > project dataflow-example: An exception occured while executing the
>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>> > (default-cli) on project dataflow-example: An exception occured while
>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>> (MojoExecutor.java:306)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:211)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:165)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:157)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>> > (LifecycleModuleBuilder.java:121)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>> > (LifecycleModuleBuilder.java:81)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>> > (SingleThreadedBuilder.java:56)
>>>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>> (LifecycleStarter.java:127)
>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>> (DefaultMaven.java:294)
>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>> (DefaultMaven.java:192)
>>>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>> (NativeMethodAccessorImpl.java:62)
>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>> >     at
>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>> > (Launcher.java:282)
>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>> (Launcher.java:225)
>>>>> >     at
>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>> > (Launcher.java:406)
>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>> (Launcher.java:347) Caused by:
>>>>> > org.apache.maven.plugin.MojoExecutionException: An exception occured
>>>>> > while executing the Java class. java.lang.IllegalStateException
>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo.execute
>>>>> (ExecJavaMojo.java:311)
>>>>> >     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
>>>>> (DefaultBuildPluginManager.java:137)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>> (MojoExecutor.java:301)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:211)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:165)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:157)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>> > (LifecycleModuleBuilder.java:121)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>> > (LifecycleModuleBuilder.java:81)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>> > (SingleThreadedBuilder.java:56)
>>>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>> (LifecycleStarter.java:127)
>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>> (DefaultMaven.java:294)
>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>> (DefaultMaven.java:192)
>>>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>> (NativeMethodAccessorImpl.java:62)
>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>> >     at
>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>> > (Launcher.java:282)
>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>> (Launcher.java:225)
>>>>> >     at
>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>> > (Launcher.java:406)
>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>> (Launcher.java:347) Caused by:
>>>>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>> > java.lang.IllegalStateException
>>>>> >     at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>> > (DirectRunner.java:373)
>>>>> >     at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>> > (DirectRunner.java:341)
>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>> (DirectRunner.java:218)
>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>> (DirectRunner.java:67)
>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>>>>> >     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run
>>>>> (ExecJavaMojo.java:254)
>>>>> >     at java.lang.Thread.run (Thread.java:748) Caused by:
>>>>> java.lang.IllegalStateException
>>>>> >     at
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>> > (Preconditions.java:491)
>>>>> >     at
>>>>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
>>>>> > (RowCoderGenerator.java:314)
>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>> (Unknown Source)
>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>> (Unknown Source)
>>>>> >     at org.apache.beam.sdk.schemas.SchemaCoder.encode
>>>>> (SchemaCoder.java:124)
>>>>> >     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
>>>>> (CoderUtils.java:85)
>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>> (CoderUtils.java:69)
>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>> (CoderUtils.java:54)
>>>>> >     at org.apache.beam.sdk.util.CoderUtils.clone
>>>>> (CoderUtils.java:144)
>>>>> >     at
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
>>>>> > (MutationDetectors.java:118)
>>>>> >     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
>>>>> (MutationDetectors.java:49)
>>>>> >     at
>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
>>>>> > (ImmutabilityCheckingBundleFactory.java:115)
>>>>> >     at
>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
>>>>> > (ParDoEvaluator.java:305)
>>>>> >     at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
>>>>> > (SimpleDoFnRunner.java:268)
>>>>> >     at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
>>>>> > (SimpleDoFnRunner.java:84)
>>>>> >     at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>> > (SimpleDoFnRunner.java:416)
>>>>> >     at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>> > (SimpleDoFnRunner.java:404)
>>>>> >     at com.example.dataflow.Pipeline$HandleNullValues.processElement
>>>>> (CustomFunctions.java:310)
>>>>>
>>>>>
>>>>> Cheers!
>>>>>
>>>>

Re: [Question] Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

Posted by Andrew Pilloud <ap...@google.com>.
Are you sure the min/max values are coming from SqlTransform? I wrote a
quick test in Beam (using Integer, but all types have the same null
wrapper) and the nulls were dropped.

More detail: I added the following test case
to BeamSqlDslAggregationNullableTest on the latest Beam. The input values
for f_int1 are 1, null, 2, null, null, null, 3. The test passed, (the
return value being 1) which indicates we are dropping nulls before
aggregation. (I don't believe this is actually correct behavior, we should
be returning null?)

 @Test
 public void testMin() {
   String sql = "SELECT min(f_int1) FROM PCOLLECTION";

 PAssert.that(boundedInput.apply(SqlTransform.query(sql))).satisfies(matchesScalar(1));
   pipeline.run();
 }

On Fri, Apr 15, 2022 at 11:37 AM Jimmy Headdon <ji...@mojiworks.com>
wrote:

> Thanks for the swift response Brian, Andrew.  I've tried your suggestion
> Brian, and sadly I get the same error as the lengthy call stack from the
> end of my original post (IllegalStateException) - it appears the
> PCollection might have been finalised my the DoFn, and therefore I cannot
> setRowSchema against it?  In the fully implemented version I captured in my
> original post you can see I call withSchema when creating the Row objects,
> though interestingly the cutdown version I also posted gives the same
> error, even though it's passing the input row to the output without
> mutating it?
>
> Regarding the NULL values from Beam SQL aggregations, I've re-run my
> pipeline with my NullValueHandler commented out, and unfortunately I can
> still see min and max integers being written back to BigQuery.  Is there
> anything you'd like me to test to get you some further feedback?
>
> Thanks again!
>
> On Fri, 15 Apr 2022 at 18:37, Andrew Pilloud <ap...@google.com> wrote:
>
>> Beam SQL's null aggregation behavior changed radically in 2.34.0.
>> https://github.com/apache/beam/pull/15174
>>
>> I think we drop them now?
>>
>> Andrew
>>
>>
>> On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <bh...@google.com>
>> wrote:
>>
>>> Hi Jimmy,
>>>
>>> Sorry about this, I wonder if this error message could be more helpful?
>>> You're right that the issue is that the output PCollection produced by
>>> HandleNullValues doesn't have a schema attached to it. Beam has no way of
>>> inferring the output schema through the opaque DoFn. A quick solution might
>>> be to just propagate the schema from the SQL output:
>>>
>>>     PCollection<Row> sqlOutput = inputCollection.apply(
>>>         "Generate Aggregates",
>>>
>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>     )
>>>
>>>     PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new
>>> HandleNullValues())).setRowSchema(inputCollection.getSchema())
>>>
>>> @Reuven Lax <re...@google.com> may have some other ideas.
>>>
>>> Stepping back to the reason you need to add HandleNullValues: "I call
>>> this function within `ParDo.of` to detect `Double.MAX_VALUE` and
>>> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL
>>> returns the Double min/max values when it encounters a `NULL` value, rather
>>> than just returning NULL."
>>> @Andrew Pilloud <ap...@google.com> is this intended? Do you know if
>>> there's any way to modify this behavior?
>>>
>>> Brian
>>>
>>> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon <
>>> jimmy.headdon@mojiworks.com> wrote:
>>>
>>>> Hello
>>>>
>>>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to
>>>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change
>>>> that I'd appreciate some support with.  Sorry this is quite a long one, I
>>>> wanted to capture as much context as I could, but please shout if there's
>>>> anything you'd like to dig into.
>>>>
>>>> I'd note that I've also raised this on StackOverflow, if you find it
>>>> easier to read the Markdown there -
>>>> https://stackoverflow.com/q/71875593/18805546.
>>>>
>>>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then
>>>> processing aggregates before I write the results back to BigQuery.  I'm
>>>> able to read from/write to BigQuery without issue, but after the upgrade my
>>>> pipeline to calculate aggregates is failing at runtime, specifically a
>>>> `DoFn` I have written to sanitise the results returned from the Beam
>>>> `SqlTransform.query` command.  I call this function within `ParDo.of` to
>>>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
>>>> aggregates in Beam SQL returns the Double min/max values when it encounters
>>>> a `NULL` value, rather than just returning NULL.  I did try filtering the
>>>> initial BigQuery raw data results, but this issue creeps in at the Beam SQL
>>>> level.
>>>>
>>>> There may be better ways to do this (I'm open to suggestions!).  I've
>>>> included a bunch of code snippets from my pipeline that I've tried to
>>>> simplify, so apologies if there's anything obviously janky.  Here's what I
>>>> previously had before the library upgrade:
>>>>
>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>         "Generate Aggregates",
>>>>
>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>     )
>>>>     .apply(ParDo.of(new HandleNullValues()));
>>>>
>>>> I've included the `HandleNullValues` definition at the bottom of this
>>>> email, but it appears v2.21.0 introduced a breaking change whereby the
>>>> coder inference was disabled for Beam Row types in [this ticket](
>>>> https://issues.apache.org/jira/browse/BEAM-9569).  This change has
>>>> caused the above code to fail with the following runtime error:
>>>>
>>>> > [ERROR] Failed to execute goal
>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>> > project dataflow-example: An exception occured while executing the
>>>> > Java class. Unable to return a default Coder for
>>>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
>>>> > [PCollection@83398426]. Correct one of the following root causes:
>>>> > [ERROR]   No Coder has been manually specified;  you may do so using
>>>> > .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
>>>> > failed: Cannot provide a coder for a Beam Row. Please provide a schema
>>>> > instead using PCollection.setRowSchema. [ERROR]   Using the default
>>>> > output Coder from the producing PTransform failed:
>>>> > PTransform.getOutputCoder called.
>>>>
>>>> I've followed the advice on the aforementioned JIRA ticket, plus a
>>>> bunch of other examples I found online, but without much joy.  I've tried
>>>> applying `setCoder(SerializableCoder.of(Row.class))` after the
>>>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though
>>>> I'm not yet sure if it's just suppressed the error, or if it's actually
>>>> working), but that changes causes another runtime error:
>>>>
>>>> > [ERROR] Failed to execute goal
>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>> > project dataflow-example: An exception occured while executing the
>>>> > Java class. Cannot call getSchema when there is no schema -> [Help 1]
>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>> > (default-cli) on project dataflow-example: An exception occured while
>>>> > executing the Java class. Cannot call getSchema when there is no
>>>> > schema
>>>>
>>>> This error is thrown further down my pipeline, when I perform a
>>>> subsequent `SqlTransform.query` to JOIN some results together.
>>>>
>>>>     PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
>>>>                     .and(new TupleTag<Row>("Experiments"), experiments)
>>>>                         .apply("Joining Aggregates to Experiments",
>>>> SqlTransform.query(aggregateExperimentJoin()))
>>>>                         .apply(ParDo.of(new
>>>> MapBeamRowsToBigQueryTableRows()))
>>>>                         .apply(BigQueryIO.writeTableRows()
>>>>
>>>> .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>>>>
>>>> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>>>>
>>>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
>>>> String>) projectId -> projectId + ":daily_aggregates.experiments")));
>>>>
>>>> I've verified the `aggregates` collection is indeed missing a schema if
>>>> I interrogate the `hasSchema` property.  The second `experiments`
>>>> PCollection above does have a row schema set though:
>>>>
>>>>     PCollection<Row> rawExperiments = rows.apply(
>>>>         SqlTransform.query("select sessionId, experiments from
>>>> PCOLLECTION")
>>>>     );
>>>>     PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
>>>> CustomFunctions.ParseExperiments(bigQuerySchema)));
>>>>     experiments.setRowSchema(bigQuerySchema);
>>>>
>>>> I've also tried applying this coder at the pipeline level, with
>>>> different variations on the following.  But this also gives the same error:
>>>>
>>>>     CoderRegistry cr = pipeline.getCoderRegistry();
>>>>     cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
>>>>     cr.registerCoderForType(TypeDescriptors.rows(),
>>>> RowCoder.of(bigQuerySchema));
>>>>
>>>> The `bigQuerySchema` object referenced above is the initial schema used
>>>> to retrieve all raw data from BigQuery, though that part of the pipeline
>>>> works fine, so potentially I need to pass the `aggregatesSchema` object
>>>> (see below) in to `registerCoderForType` for the pipeline?
>>>>
>>>> I then tried to set the row schema on `aggregates` (which was another
>>>> suggestion in the error above).  I've confirmed that calling `setCoder` is
>>>> responsible for the previous `Row` schema disappearing, where it had
>>>> previously been set by the input PCollection (and also if I call
>>>> `setRowSchema` immediately before I call the `DoFn`.
>>>>
>>>> I've simplified the schema for succinctness in this post, but it's a
>>>> subset of `bigQuerySchema` with a few new fields (simple data types).
>>>> Here's what I've tried, again with various combinations of where I call
>>>> `setCoder` and `setRowSchema` (before `apply()` and/or after).
>>>>
>>>>     Schema aggregatesSchema = Schema.builder()
>>>>         .addNullableField("userId", FieldType.STRING)
>>>>         .addNullableField("sessionId", FieldType.STRING)
>>>>         .addNullableField("experimentsPerDay", FieldType.INT64)
>>>>         .build();
>>>>
>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>         "Generate Aggregates",
>>>>
>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>     )
>>>>     .apply(ParDo.of(new HandleNullValues()))
>>>>     .setCoder(SerializableCoder.of(Row.class))
>>>>     .setRowSchema(aggregatesSchema);
>>>>
>>>> Unfortunately, this causes a third runtime error which I've not been
>>>> able to figure out:
>>>>
>>>> > [ERROR] Failed to execute goal
>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>> > project dataflow-example: An exception occured while executing the
>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>> > (default-cli) on project dataflow-example: An exception occured while
>>>> > executing the Java class. java.lang.IllegalStateException
>>>>
>>>> The full call stack is at the bottom of this email, and I can see it
>>>> originating from my `HandleNullValues` `DoFn`, but after that it disappears
>>>> into the Beam libraries.
>>>>
>>>> I'm at a loss as to which route is recommended, and how to proceed, as
>>>> both coder and schema options are causing different issues.
>>>>
>>>> Any help would be greatly appreciated, and thanks for your efforts on
>>>> this project!
>>>>
>>>>
>>>> The full `DoFn` I've referred to is further below, but it's worth
>>>> noting that just having an essentially empty `DoFn` with both input and
>>>> output of Beam `Row` types causes the same issue:
>>>>
>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>         @ProcessElement
>>>>         public void processElement(ProcessContext c) {
>>>>             Row row = c.element();
>>>>             c.output(row);
>>>>         }
>>>>     }
>>>>
>>>> Here's the full implementation, if anyone can think of a better way to
>>>> detect and replace `NULL` values returned from Beam SQL:
>>>>
>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>         @ProcessElement
>>>>         public void processElement(ProcessContext c) {
>>>>             Row row = c.element();
>>>>             List<String> fields = row.getSchema().getFieldNames();
>>>>             Builder rowBuilder = Row.withSchema(row.getSchema());
>>>>
>>>>             for (String f: fields) {
>>>>                 Object value = row.getValue(f);
>>>>                 if (value != null && value instanceof Long) {
>>>>                     Long longVal = row.getInt64(f);
>>>>                     if (longVal == Long.MAX_VALUE || longVal ==
>>>> Long.MIN_VALUE) {
>>>>                         rowBuilder.addValue(null);
>>>>                     } else {
>>>>                         rowBuilder.addValue(value);
>>>>                     }
>>>>                 } else if (value != null && value instanceof Double) {
>>>>                     Double doubleVal = row.getDouble(f);
>>>>                     if (doubleVal == Double.MAX_VALUE || doubleVal ==
>>>> Double.MIN_VALUE) {
>>>>                         rowBuilder.addValue(null);
>>>>                     } else {
>>>>                         rowBuilder.addValue(value);
>>>>                     }
>>>>                 } else {
>>>>                     rowBuilder.addValue(value);
>>>>                 }
>>>>             }
>>>>
>>>>             Row newRow = rowBuilder.build();
>>>>             c.output(newRow);
>>>>         }
>>>>     }
>>>>
>>>> And here's the full callstack from the `setRowSchema` issue detailed
>>>> above:
>>>>
>>>>
>>>> > [ERROR] Failed to execute goal
>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>> > project dataflow-example: An exception occured while executing the
>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>> > (default-cli) on project dataflow-example: An exception occured while
>>>> > executing the Java class. java.lang.IllegalStateException
>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>> (MojoExecutor.java:306)
>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>> (MojoExecutor.java:211)
>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>> (MojoExecutor.java:165)
>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>> (MojoExecutor.java:157)
>>>> >     at
>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>> > (LifecycleModuleBuilder.java:121)
>>>> >     at
>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>> > (LifecycleModuleBuilder.java:81)
>>>> >     at
>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>> > (SingleThreadedBuilder.java:56)
>>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>> (LifecycleStarter.java:127)
>>>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>>>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>> (NativeMethodAccessorImpl.java:62)
>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>> (DelegatingMethodAccessorImpl.java:43)
>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>> >     at
>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>> > (Launcher.java:282)
>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>> (Launcher.java:225)
>>>> >     at
>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>> > (Launcher.java:406)
>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>> (Launcher.java:347) Caused by:
>>>> > org.apache.maven.plugin.MojoExecutionException: An exception occured
>>>> > while executing the Java class. java.lang.IllegalStateException
>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo.execute
>>>> (ExecJavaMojo.java:311)
>>>> >     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
>>>> (DefaultBuildPluginManager.java:137)
>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>> (MojoExecutor.java:301)
>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>> (MojoExecutor.java:211)
>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>> (MojoExecutor.java:165)
>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>> (MojoExecutor.java:157)
>>>> >     at
>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>> > (LifecycleModuleBuilder.java:121)
>>>> >     at
>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>> > (LifecycleModuleBuilder.java:81)
>>>> >     at
>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>> > (SingleThreadedBuilder.java:56)
>>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>> (LifecycleStarter.java:127)
>>>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>>>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>> (NativeMethodAccessorImpl.java:62)
>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>> (DelegatingMethodAccessorImpl.java:43)
>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>> >     at
>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>> > (Launcher.java:282)
>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>> (Launcher.java:225)
>>>> >     at
>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>> > (Launcher.java:406)
>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>> (Launcher.java:347) Caused by:
>>>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>> > java.lang.IllegalStateException
>>>> >     at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>> > (DirectRunner.java:373)
>>>> >     at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>> > (DirectRunner.java:341)
>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>> (DirectRunner.java:218)
>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>> (DirectRunner.java:67)
>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>>>> >     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run
>>>> (ExecJavaMojo.java:254)
>>>> >     at java.lang.Thread.run (Thread.java:748) Caused by:
>>>> java.lang.IllegalStateException
>>>> >     at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>> > (Preconditions.java:491)
>>>> >     at
>>>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
>>>> > (RowCoderGenerator.java:314)
>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>> (Unknown Source)
>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>> (Unknown Source)
>>>> >     at org.apache.beam.sdk.schemas.SchemaCoder.encode
>>>> (SchemaCoder.java:124)
>>>> >     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
>>>> (CoderUtils.java:85)
>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>> (CoderUtils.java:69)
>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>> (CoderUtils.java:54)
>>>> >     at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
>>>> >     at
>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
>>>> > (MutationDetectors.java:118)
>>>> >     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
>>>> (MutationDetectors.java:49)
>>>> >     at
>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
>>>> > (ImmutabilityCheckingBundleFactory.java:115)
>>>> >     at
>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
>>>> > (ParDoEvaluator.java:305)
>>>> >     at
>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
>>>> > (SimpleDoFnRunner.java:268)
>>>> >     at
>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
>>>> > (SimpleDoFnRunner.java:84)
>>>> >     at
>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>> > (SimpleDoFnRunner.java:416)
>>>> >     at
>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>> > (SimpleDoFnRunner.java:404)
>>>> >     at com.example.dataflow.Pipeline$HandleNullValues.processElement
>>>> (CustomFunctions.java:310)
>>>>
>>>>
>>>> Cheers!
>>>>
>>>

Re: [Question] Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

Posted by Jimmy Headdon <ji...@mojiworks.com>.
Thanks for the swift response Brian, Andrew.  I've tried your suggestion
Brian, and sadly I get the same error as the lengthy call stack from the
end of my original post (IllegalStateException) - it appears the
PCollection might have been finalised my the DoFn, and therefore I cannot
setRowSchema against it?  In the fully implemented version I captured in my
original post you can see I call withSchema when creating the Row objects,
though interestingly the cutdown version I also posted gives the same
error, even though it's passing the input row to the output without
mutating it?

Regarding the NULL values from Beam SQL aggregations, I've re-run my
pipeline with my NullValueHandler commented out, and unfortunately I can
still see min and max integers being written back to BigQuery.  Is there
anything you'd like me to test to get you some further feedback?

Thanks again!

On Fri, 15 Apr 2022 at 18:37, Andrew Pilloud <ap...@google.com> wrote:

> Beam SQL's null aggregation behavior changed radically in 2.34.0.
> https://github.com/apache/beam/pull/15174
>
> I think we drop them now?
>
> Andrew
>
>
> On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <bh...@google.com>
> wrote:
>
>> Hi Jimmy,
>>
>> Sorry about this, I wonder if this error message could be more helpful?
>> You're right that the issue is that the output PCollection produced by
>> HandleNullValues doesn't have a schema attached to it. Beam has no way of
>> inferring the output schema through the opaque DoFn. A quick solution might
>> be to just propagate the schema from the SQL output:
>>
>>     PCollection<Row> sqlOutput = inputCollection.apply(
>>         "Generate Aggregates",
>>
>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>     )
>>
>>     PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new
>> HandleNullValues())).setRowSchema(inputCollection.getSchema())
>>
>> @Reuven Lax <re...@google.com> may have some other ideas.
>>
>> Stepping back to the reason you need to add HandleNullValues: "I call
>> this function within `ParDo.of` to detect `Double.MAX_VALUE` and
>> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL
>> returns the Double min/max values when it encounters a `NULL` value, rather
>> than just returning NULL."
>> @Andrew Pilloud <ap...@google.com> is this intended? Do you know if
>> there's any way to modify this behavior?
>>
>> Brian
>>
>> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon <
>> jimmy.headdon@mojiworks.com> wrote:
>>
>>> Hello
>>>
>>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to
>>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change
>>> that I'd appreciate some support with.  Sorry this is quite a long one, I
>>> wanted to capture as much context as I could, but please shout if there's
>>> anything you'd like to dig into.
>>>
>>> I'd note that I've also raised this on StackOverflow, if you find it
>>> easier to read the Markdown there -
>>> https://stackoverflow.com/q/71875593/18805546.
>>>
>>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then
>>> processing aggregates before I write the results back to BigQuery.  I'm
>>> able to read from/write to BigQuery without issue, but after the upgrade my
>>> pipeline to calculate aggregates is failing at runtime, specifically a
>>> `DoFn` I have written to sanitise the results returned from the Beam
>>> `SqlTransform.query` command.  I call this function within `ParDo.of` to
>>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
>>> aggregates in Beam SQL returns the Double min/max values when it encounters
>>> a `NULL` value, rather than just returning NULL.  I did try filtering the
>>> initial BigQuery raw data results, but this issue creeps in at the Beam SQL
>>> level.
>>>
>>> There may be better ways to do this (I'm open to suggestions!).  I've
>>> included a bunch of code snippets from my pipeline that I've tried to
>>> simplify, so apologies if there's anything obviously janky.  Here's what I
>>> previously had before the library upgrade:
>>>
>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>         "Generate Aggregates",
>>>
>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>     )
>>>     .apply(ParDo.of(new HandleNullValues()));
>>>
>>> I've included the `HandleNullValues` definition at the bottom of this
>>> email, but it appears v2.21.0 introduced a breaking change whereby the
>>> coder inference was disabled for Beam Row types in [this ticket](
>>> https://issues.apache.org/jira/browse/BEAM-9569).  This change has
>>> caused the above code to fail with the following runtime error:
>>>
>>> > [ERROR] Failed to execute goal
>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>> > project dataflow-example: An exception occured while executing the
>>> > Java class. Unable to return a default Coder for
>>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
>>> > [PCollection@83398426]. Correct one of the following root causes:
>>> > [ERROR]   No Coder has been manually specified;  you may do so using
>>> > .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
>>> > failed: Cannot provide a coder for a Beam Row. Please provide a schema
>>> > instead using PCollection.setRowSchema. [ERROR]   Using the default
>>> > output Coder from the producing PTransform failed:
>>> > PTransform.getOutputCoder called.
>>>
>>> I've followed the advice on the aforementioned JIRA ticket, plus a bunch
>>> of other examples I found online, but without much joy.  I've tried
>>> applying `setCoder(SerializableCoder.of(Row.class))` after the
>>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though
>>> I'm not yet sure if it's just suppressed the error, or if it's actually
>>> working), but that changes causes another runtime error:
>>>
>>> > [ERROR] Failed to execute goal
>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>> > project dataflow-example: An exception occured while executing the
>>> > Java class. Cannot call getSchema when there is no schema -> [Help 1]
>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>> > (default-cli) on project dataflow-example: An exception occured while
>>> > executing the Java class. Cannot call getSchema when there is no
>>> > schema
>>>
>>> This error is thrown further down my pipeline, when I perform a
>>> subsequent `SqlTransform.query` to JOIN some results together.
>>>
>>>     PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
>>>                     .and(new TupleTag<Row>("Experiments"), experiments)
>>>                         .apply("Joining Aggregates to Experiments",
>>> SqlTransform.query(aggregateExperimentJoin()))
>>>                         .apply(ParDo.of(new
>>> MapBeamRowsToBigQueryTableRows()))
>>>                         .apply(BigQueryIO.writeTableRows()
>>>
>>> .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>>>
>>> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>>>
>>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
>>> String>) projectId -> projectId + ":daily_aggregates.experiments")));
>>>
>>> I've verified the `aggregates` collection is indeed missing a schema if
>>> I interrogate the `hasSchema` property.  The second `experiments`
>>> PCollection above does have a row schema set though:
>>>
>>>     PCollection<Row> rawExperiments = rows.apply(
>>>         SqlTransform.query("select sessionId, experiments from
>>> PCOLLECTION")
>>>     );
>>>     PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
>>> CustomFunctions.ParseExperiments(bigQuerySchema)));
>>>     experiments.setRowSchema(bigQuerySchema);
>>>
>>> I've also tried applying this coder at the pipeline level, with
>>> different variations on the following.  But this also gives the same error:
>>>
>>>     CoderRegistry cr = pipeline.getCoderRegistry();
>>>     cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
>>>     cr.registerCoderForType(TypeDescriptors.rows(),
>>> RowCoder.of(bigQuerySchema));
>>>
>>> The `bigQuerySchema` object referenced above is the initial schema used
>>> to retrieve all raw data from BigQuery, though that part of the pipeline
>>> works fine, so potentially I need to pass the `aggregatesSchema` object
>>> (see below) in to `registerCoderForType` for the pipeline?
>>>
>>> I then tried to set the row schema on `aggregates` (which was another
>>> suggestion in the error above).  I've confirmed that calling `setCoder` is
>>> responsible for the previous `Row` schema disappearing, where it had
>>> previously been set by the input PCollection (and also if I call
>>> `setRowSchema` immediately before I call the `DoFn`.
>>>
>>> I've simplified the schema for succinctness in this post, but it's a
>>> subset of `bigQuerySchema` with a few new fields (simple data types).
>>> Here's what I've tried, again with various combinations of where I call
>>> `setCoder` and `setRowSchema` (before `apply()` and/or after).
>>>
>>>     Schema aggregatesSchema = Schema.builder()
>>>         .addNullableField("userId", FieldType.STRING)
>>>         .addNullableField("sessionId", FieldType.STRING)
>>>         .addNullableField("experimentsPerDay", FieldType.INT64)
>>>         .build();
>>>
>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>         "Generate Aggregates",
>>>
>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>     )
>>>     .apply(ParDo.of(new HandleNullValues()))
>>>     .setCoder(SerializableCoder.of(Row.class))
>>>     .setRowSchema(aggregatesSchema);
>>>
>>> Unfortunately, this causes a third runtime error which I've not been
>>> able to figure out:
>>>
>>> > [ERROR] Failed to execute goal
>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>> > project dataflow-example: An exception occured while executing the
>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>> > (default-cli) on project dataflow-example: An exception occured while
>>> > executing the Java class. java.lang.IllegalStateException
>>>
>>> The full call stack is at the bottom of this email, and I can see it
>>> originating from my `HandleNullValues` `DoFn`, but after that it disappears
>>> into the Beam libraries.
>>>
>>> I'm at a loss as to which route is recommended, and how to proceed, as
>>> both coder and schema options are causing different issues.
>>>
>>> Any help would be greatly appreciated, and thanks for your efforts on
>>> this project!
>>>
>>>
>>> The full `DoFn` I've referred to is further below, but it's worth noting
>>> that just having an essentially empty `DoFn` with both input and output of
>>> Beam `Row` types causes the same issue:
>>>
>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>         @ProcessElement
>>>         public void processElement(ProcessContext c) {
>>>             Row row = c.element();
>>>             c.output(row);
>>>         }
>>>     }
>>>
>>> Here's the full implementation, if anyone can think of a better way to
>>> detect and replace `NULL` values returned from Beam SQL:
>>>
>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>         @ProcessElement
>>>         public void processElement(ProcessContext c) {
>>>             Row row = c.element();
>>>             List<String> fields = row.getSchema().getFieldNames();
>>>             Builder rowBuilder = Row.withSchema(row.getSchema());
>>>
>>>             for (String f: fields) {
>>>                 Object value = row.getValue(f);
>>>                 if (value != null && value instanceof Long) {
>>>                     Long longVal = row.getInt64(f);
>>>                     if (longVal == Long.MAX_VALUE || longVal ==
>>> Long.MIN_VALUE) {
>>>                         rowBuilder.addValue(null);
>>>                     } else {
>>>                         rowBuilder.addValue(value);
>>>                     }
>>>                 } else if (value != null && value instanceof Double) {
>>>                     Double doubleVal = row.getDouble(f);
>>>                     if (doubleVal == Double.MAX_VALUE || doubleVal ==
>>> Double.MIN_VALUE) {
>>>                         rowBuilder.addValue(null);
>>>                     } else {
>>>                         rowBuilder.addValue(value);
>>>                     }
>>>                 } else {
>>>                     rowBuilder.addValue(value);
>>>                 }
>>>             }
>>>
>>>             Row newRow = rowBuilder.build();
>>>             c.output(newRow);
>>>         }
>>>     }
>>>
>>> And here's the full callstack from the `setRowSchema` issue detailed
>>> above:
>>>
>>>
>>> > [ERROR] Failed to execute goal
>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>> > project dataflow-example: An exception occured while executing the
>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>> > (default-cli) on project dataflow-example: An exception occured while
>>> > executing the Java class. java.lang.IllegalStateException
>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>> (MojoExecutor.java:306)
>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>> (MojoExecutor.java:211)
>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>> (MojoExecutor.java:165)
>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>> (MojoExecutor.java:157)
>>> >     at
>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>> > (LifecycleModuleBuilder.java:121)
>>> >     at
>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>> > (LifecycleModuleBuilder.java:81)
>>> >     at
>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>> > (SingleThreadedBuilder.java:56)
>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>> (LifecycleStarter.java:127)
>>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>> (NativeMethodAccessorImpl.java:62)
>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>> (DelegatingMethodAccessorImpl.java:43)
>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>> > (Launcher.java:282)
>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>> (Launcher.java:225)
>>> >     at
>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>> > (Launcher.java:406)
>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>> (Launcher.java:347) Caused by:
>>> > org.apache.maven.plugin.MojoExecutionException: An exception occured
>>> > while executing the Java class. java.lang.IllegalStateException
>>> >     at org.codehaus.mojo.exec.ExecJavaMojo.execute
>>> (ExecJavaMojo.java:311)
>>> >     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
>>> (DefaultBuildPluginManager.java:137)
>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>> (MojoExecutor.java:301)
>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>> (MojoExecutor.java:211)
>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>> (MojoExecutor.java:165)
>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>> (MojoExecutor.java:157)
>>> >     at
>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>> > (LifecycleModuleBuilder.java:121)
>>> >     at
>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>> > (LifecycleModuleBuilder.java:81)
>>> >     at
>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>> > (SingleThreadedBuilder.java:56)
>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>> (LifecycleStarter.java:127)
>>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>> (NativeMethodAccessorImpl.java:62)
>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>> (DelegatingMethodAccessorImpl.java:43)
>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>> > (Launcher.java:282)
>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>> (Launcher.java:225)
>>> >     at
>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>> > (Launcher.java:406)
>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>> (Launcher.java:347) Caused by:
>>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> > java.lang.IllegalStateException
>>> >     at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>> > (DirectRunner.java:373)
>>> >     at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>> > (DirectRunner.java:341)
>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>> (DirectRunner.java:218)
>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>> (DirectRunner.java:67)
>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>>> >     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>>> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run
>>> (ExecJavaMojo.java:254)
>>> >     at java.lang.Thread.run (Thread.java:748) Caused by:
>>> java.lang.IllegalStateException
>>> >     at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>> > (Preconditions.java:491)
>>> >     at
>>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
>>> > (RowCoderGenerator.java:314)
>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>> (Unknown Source)
>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>> (Unknown Source)
>>> >     at org.apache.beam.sdk.schemas.SchemaCoder.encode
>>> (SchemaCoder.java:124)
>>> >     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
>>> (CoderUtils.java:85)
>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>> (CoderUtils.java:69)
>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>> (CoderUtils.java:54)
>>> >     at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
>>> >     at
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
>>> > (MutationDetectors.java:118)
>>> >     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
>>> (MutationDetectors.java:49)
>>> >     at
>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
>>> > (ImmutabilityCheckingBundleFactory.java:115)
>>> >     at
>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
>>> > (ParDoEvaluator.java:305)
>>> >     at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
>>> > (SimpleDoFnRunner.java:268)
>>> >     at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
>>> > (SimpleDoFnRunner.java:84)
>>> >     at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>> > (SimpleDoFnRunner.java:416)
>>> >     at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>> > (SimpleDoFnRunner.java:404)
>>> >     at com.example.dataflow.Pipeline$HandleNullValues.processElement
>>> (CustomFunctions.java:310)
>>>
>>>
>>> Cheers!
>>>
>>

Re: [Question] Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

Posted by Andrew Pilloud <ap...@google.com>.
Beam SQL's null aggregation behavior changed radically in 2.34.0.
https://github.com/apache/beam/pull/15174

I think we drop them now?

Andrew


On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <bh...@google.com> wrote:

> Hi Jimmy,
>
> Sorry about this, I wonder if this error message could be more helpful?
> You're right that the issue is that the output PCollection produced by
> HandleNullValues doesn't have a schema attached to it. Beam has no way of
> inferring the output schema through the opaque DoFn. A quick solution might
> be to just propagate the schema from the SQL output:
>
>     PCollection<Row> sqlOutput = inputCollection.apply(
>         "Generate Aggregates",
>
> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>     )
>
>     PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new
> HandleNullValues())).setRowSchema(inputCollection.getSchema())
>
> @Reuven Lax <re...@google.com> may have some other ideas.
>
> Stepping back to the reason you need to add HandleNullValues: "I call this
> function within `ParDo.of` to detect `Double.MAX_VALUE` and
> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL
> returns the Double min/max values when it encounters a `NULL` value, rather
> than just returning NULL."
> @Andrew Pilloud <ap...@google.com> is this intended? Do you know if
> there's any way to modify this behavior?
>
> Brian
>
> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon <ji...@mojiworks.com>
> wrote:
>
>> Hello
>>
>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to
>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change
>> that I'd appreciate some support with.  Sorry this is quite a long one, I
>> wanted to capture as much context as I could, but please shout if there's
>> anything you'd like to dig into.
>>
>> I'd note that I've also raised this on StackOverflow, if you find it
>> easier to read the Markdown there -
>> https://stackoverflow.com/q/71875593/18805546.
>>
>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then
>> processing aggregates before I write the results back to BigQuery.  I'm
>> able to read from/write to BigQuery without issue, but after the upgrade my
>> pipeline to calculate aggregates is failing at runtime, specifically a
>> `DoFn` I have written to sanitise the results returned from the Beam
>> `SqlTransform.query` command.  I call this function within `ParDo.of` to
>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
>> aggregates in Beam SQL returns the Double min/max values when it encounters
>> a `NULL` value, rather than just returning NULL.  I did try filtering the
>> initial BigQuery raw data results, but this issue creeps in at the Beam SQL
>> level.
>>
>> There may be better ways to do this (I'm open to suggestions!).  I've
>> included a bunch of code snippets from my pipeline that I've tried to
>> simplify, so apologies if there's anything obviously janky.  Here's what I
>> previously had before the library upgrade:
>>
>>     PCollection<Row> aggregates = inputCollection.apply(
>>         "Generate Aggregates",
>>
>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>     )
>>     .apply(ParDo.of(new HandleNullValues()));
>>
>> I've included the `HandleNullValues` definition at the bottom of this
>> email, but it appears v2.21.0 introduced a breaking change whereby the
>> coder inference was disabled for Beam Row types in [this ticket](
>> https://issues.apache.org/jira/browse/BEAM-9569).  This change has
>> caused the above code to fail with the following runtime error:
>>
>> > [ERROR] Failed to execute goal
>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>> > project dataflow-example: An exception occured while executing the
>> > Java class. Unable to return a default Coder for
>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
>> > [PCollection@83398426]. Correct one of the following root causes:
>> > [ERROR]   No Coder has been manually specified;  you may do so using
>> > .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
>> > failed: Cannot provide a coder for a Beam Row. Please provide a schema
>> > instead using PCollection.setRowSchema. [ERROR]   Using the default
>> > output Coder from the producing PTransform failed:
>> > PTransform.getOutputCoder called.
>>
>> I've followed the advice on the aforementioned JIRA ticket, plus a bunch
>> of other examples I found online, but without much joy.  I've tried
>> applying `setCoder(SerializableCoder.of(Row.class))` after the
>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though
>> I'm not yet sure if it's just suppressed the error, or if it's actually
>> working), but that changes causes another runtime error:
>>
>> > [ERROR] Failed to execute goal
>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>> > project dataflow-example: An exception occured while executing the
>> > Java class. Cannot call getSchema when there is no schema -> [Help 1]
>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>> > (default-cli) on project dataflow-example: An exception occured while
>> > executing the Java class. Cannot call getSchema when there is no
>> > schema
>>
>> This error is thrown further down my pipeline, when I perform a
>> subsequent `SqlTransform.query` to JOIN some results together.
>>
>>     PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
>>                     .and(new TupleTag<Row>("Experiments"), experiments)
>>                         .apply("Joining Aggregates to Experiments",
>> SqlTransform.query(aggregateExperimentJoin()))
>>                         .apply(ParDo.of(new
>> MapBeamRowsToBigQueryTableRows()))
>>                         .apply(BigQueryIO.writeTableRows()
>>
>> .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>>
>> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>>
>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
>> String>) projectId -> projectId + ":daily_aggregates.experiments")));
>>
>> I've verified the `aggregates` collection is indeed missing a schema if I
>> interrogate the `hasSchema` property.  The second `experiments` PCollection
>> above does have a row schema set though:
>>
>>     PCollection<Row> rawExperiments = rows.apply(
>>         SqlTransform.query("select sessionId, experiments from
>> PCOLLECTION")
>>     );
>>     PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
>> CustomFunctions.ParseExperiments(bigQuerySchema)));
>>     experiments.setRowSchema(bigQuerySchema);
>>
>> I've also tried applying this coder at the pipeline level, with different
>> variations on the following.  But this also gives the same error:
>>
>>     CoderRegistry cr = pipeline.getCoderRegistry();
>>     cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
>>     cr.registerCoderForType(TypeDescriptors.rows(),
>> RowCoder.of(bigQuerySchema));
>>
>> The `bigQuerySchema` object referenced above is the initial schema used
>> to retrieve all raw data from BigQuery, though that part of the pipeline
>> works fine, so potentially I need to pass the `aggregatesSchema` object
>> (see below) in to `registerCoderForType` for the pipeline?
>>
>> I then tried to set the row schema on `aggregates` (which was another
>> suggestion in the error above).  I've confirmed that calling `setCoder` is
>> responsible for the previous `Row` schema disappearing, where it had
>> previously been set by the input PCollection (and also if I call
>> `setRowSchema` immediately before I call the `DoFn`.
>>
>> I've simplified the schema for succinctness in this post, but it's a
>> subset of `bigQuerySchema` with a few new fields (simple data types).
>> Here's what I've tried, again with various combinations of where I call
>> `setCoder` and `setRowSchema` (before `apply()` and/or after).
>>
>>     Schema aggregatesSchema = Schema.builder()
>>         .addNullableField("userId", FieldType.STRING)
>>         .addNullableField("sessionId", FieldType.STRING)
>>         .addNullableField("experimentsPerDay", FieldType.INT64)
>>         .build();
>>
>>     PCollection<Row> aggregates = inputCollection.apply(
>>         "Generate Aggregates",
>>
>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>     )
>>     .apply(ParDo.of(new HandleNullValues()))
>>     .setCoder(SerializableCoder.of(Row.class))
>>     .setRowSchema(aggregatesSchema);
>>
>> Unfortunately, this causes a third runtime error which I've not been able
>> to figure out:
>>
>> > [ERROR] Failed to execute goal
>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>> > project dataflow-example: An exception occured while executing the
>> > Java class. java.lang.IllegalStateException -> [Help 1]
>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>> > (default-cli) on project dataflow-example: An exception occured while
>> > executing the Java class. java.lang.IllegalStateException
>>
>> The full call stack is at the bottom of this email, and I can see it
>> originating from my `HandleNullValues` `DoFn`, but after that it disappears
>> into the Beam libraries.
>>
>> I'm at a loss as to which route is recommended, and how to proceed, as
>> both coder and schema options are causing different issues.
>>
>> Any help would be greatly appreciated, and thanks for your efforts on
>> this project!
>>
>>
>> The full `DoFn` I've referred to is further below, but it's worth noting
>> that just having an essentially empty `DoFn` with both input and output of
>> Beam `Row` types causes the same issue:
>>
>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>         @ProcessElement
>>         public void processElement(ProcessContext c) {
>>             Row row = c.element();
>>             c.output(row);
>>         }
>>     }
>>
>> Here's the full implementation, if anyone can think of a better way to
>> detect and replace `NULL` values returned from Beam SQL:
>>
>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>         @ProcessElement
>>         public void processElement(ProcessContext c) {
>>             Row row = c.element();
>>             List<String> fields = row.getSchema().getFieldNames();
>>             Builder rowBuilder = Row.withSchema(row.getSchema());
>>
>>             for (String f: fields) {
>>                 Object value = row.getValue(f);
>>                 if (value != null && value instanceof Long) {
>>                     Long longVal = row.getInt64(f);
>>                     if (longVal == Long.MAX_VALUE || longVal ==
>> Long.MIN_VALUE) {
>>                         rowBuilder.addValue(null);
>>                     } else {
>>                         rowBuilder.addValue(value);
>>                     }
>>                 } else if (value != null && value instanceof Double) {
>>                     Double doubleVal = row.getDouble(f);
>>                     if (doubleVal == Double.MAX_VALUE || doubleVal ==
>> Double.MIN_VALUE) {
>>                         rowBuilder.addValue(null);
>>                     } else {
>>                         rowBuilder.addValue(value);
>>                     }
>>                 } else {
>>                     rowBuilder.addValue(value);
>>                 }
>>             }
>>
>>             Row newRow = rowBuilder.build();
>>             c.output(newRow);
>>         }
>>     }
>>
>> And here's the full callstack from the `setRowSchema` issue detailed
>> above:
>>
>>
>> > [ERROR] Failed to execute goal
>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>> > project dataflow-example: An exception occured while executing the
>> > Java class. java.lang.IllegalStateException -> [Help 1]
>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>> > (default-cli) on project dataflow-example: An exception occured while
>> > executing the Java class. java.lang.IllegalStateException
>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>> (MojoExecutor.java:306)
>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>> (MojoExecutor.java:211)
>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>> (MojoExecutor.java:165)
>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>> (MojoExecutor.java:157)
>> >     at
>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>> > (LifecycleModuleBuilder.java:121)
>> >     at
>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>> > (LifecycleModuleBuilder.java:81)
>> >     at
>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>> > (SingleThreadedBuilder.java:56)
>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>> (LifecycleStarter.java:127)
>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>> (NativeMethodAccessorImpl.java:62)
>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>> (DelegatingMethodAccessorImpl.java:43)
>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>> > (Launcher.java:282)
>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>> (Launcher.java:225)
>> >     at
>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>> > (Launcher.java:406)
>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>> (Launcher.java:347) Caused by:
>> > org.apache.maven.plugin.MojoExecutionException: An exception occured
>> > while executing the Java class. java.lang.IllegalStateException
>> >     at org.codehaus.mojo.exec.ExecJavaMojo.execute
>> (ExecJavaMojo.java:311)
>> >     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
>> (DefaultBuildPluginManager.java:137)
>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>> (MojoExecutor.java:301)
>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>> (MojoExecutor.java:211)
>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>> (MojoExecutor.java:165)
>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>> (MojoExecutor.java:157)
>> >     at
>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>> > (LifecycleModuleBuilder.java:121)
>> >     at
>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>> > (LifecycleModuleBuilder.java:81)
>> >     at
>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>> > (SingleThreadedBuilder.java:56)
>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>> (LifecycleStarter.java:127)
>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>> (NativeMethodAccessorImpl.java:62)
>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>> (DelegatingMethodAccessorImpl.java:43)
>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>> > (Launcher.java:282)
>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>> (Launcher.java:225)
>> >     at
>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>> > (Launcher.java:406)
>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>> (Launcher.java:347) Caused by:
>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> > java.lang.IllegalStateException
>> >     at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>> > (DirectRunner.java:373)
>> >     at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>> > (DirectRunner.java:341)
>> >     at org.apache.beam.runners.direct.DirectRunner.run
>> (DirectRunner.java:218)
>> >     at org.apache.beam.runners.direct.DirectRunner.run
>> (DirectRunner.java:67)
>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>> >     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
>> >     at java.lang.Thread.run (Thread.java:748) Caused by:
>> java.lang.IllegalStateException
>> >     at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>> > (Preconditions.java:491)
>> >     at
>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
>> > (RowCoderGenerator.java:314)
>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>> (Unknown Source)
>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>> (Unknown Source)
>> >     at org.apache.beam.sdk.schemas.SchemaCoder.encode
>> (SchemaCoder.java:124)
>> >     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
>> (CoderUtils.java:85)
>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>> (CoderUtils.java:69)
>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>> (CoderUtils.java:54)
>> >     at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
>> >     at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
>> > (MutationDetectors.java:118)
>> >     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
>> (MutationDetectors.java:49)
>> >     at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
>> > (ImmutabilityCheckingBundleFactory.java:115)
>> >     at
>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
>> > (ParDoEvaluator.java:305)
>> >     at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
>> > (SimpleDoFnRunner.java:268)
>> >     at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
>> > (SimpleDoFnRunner.java:84)
>> >     at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>> > (SimpleDoFnRunner.java:416)
>> >     at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>> > (SimpleDoFnRunner.java:404)
>> >     at com.example.dataflow.Pipeline$HandleNullValues.processElement
>> (CustomFunctions.java:310)
>>
>>
>> Cheers!
>>
>

Re: [Question] Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

Posted by Brian Hulette <bh...@google.com>.
Hi Jimmy,

Sorry about this, I wonder if this error message could be more helpful?
You're right that the issue is that the output PCollection produced by
HandleNullValues doesn't have a schema attached to it. Beam has no way of
inferring the output schema through the opaque DoFn. A quick solution might
be to just propagate the schema from the SQL output:

    PCollection<Row> sqlOutput = inputCollection.apply(
        "Generate Aggregates",

SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
    )

    PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new
HandleNullValues())).setRowSchema(inputCollection.getSchema())

@Reuven Lax <re...@google.com> may have some other ideas.

Stepping back to the reason you need to add HandleNullValues: "I call this
function within `ParDo.of` to detect `Double.MAX_VALUE` and
`Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL
returns the Double min/max values when it encounters a `NULL` value, rather
than just returning NULL."
@Andrew Pilloud <ap...@google.com> is this intended? Do you know if
there's any way to modify this behavior?

Brian

On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon <ji...@mojiworks.com>
wrote:

> Hello
>
> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to
> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change
> that I'd appreciate some support with.  Sorry this is quite a long one, I
> wanted to capture as much context as I could, but please shout if there's
> anything you'd like to dig into.
>
> I'd note that I've also raised this on StackOverflow, if you find it
> easier to read the Markdown there -
> https://stackoverflow.com/q/71875593/18805546.
>
> I'm using Beam inside GCP Dataflow to read data from BigQuery, then
> processing aggregates before I write the results back to BigQuery.  I'm
> able to read from/write to BigQuery without issue, but after the upgrade my
> pipeline to calculate aggregates is failing at runtime, specifically a
> `DoFn` I have written to sanitise the results returned from the Beam
> `SqlTransform.query` command.  I call this function within `ParDo.of` to
> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
> aggregates in Beam SQL returns the Double min/max values when it encounters
> a `NULL` value, rather than just returning NULL.  I did try filtering the
> initial BigQuery raw data results, but this issue creeps in at the Beam SQL
> level.
>
> There may be better ways to do this (I'm open to suggestions!).  I've
> included a bunch of code snippets from my pipeline that I've tried to
> simplify, so apologies if there's anything obviously janky.  Here's what I
> previously had before the library upgrade:
>
>     PCollection<Row> aggregates = inputCollection.apply(
>         "Generate Aggregates",
>
> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>     )
>     .apply(ParDo.of(new HandleNullValues()));
>
> I've included the `HandleNullValues` definition at the bottom of this
> email, but it appears v2.21.0 introduced a breaking change whereby the
> coder inference was disabled for Beam Row types in [this ticket](
> https://issues.apache.org/jira/browse/BEAM-9569).  This change has caused
> the above code to fail with the following runtime error:
>
> > [ERROR] Failed to execute goal
> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> > project dataflow-example: An exception occured while executing the
> > Java class. Unable to return a default Coder for
> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
> > [PCollection@83398426]. Correct one of the following root causes:
> > [ERROR]   No Coder has been manually specified;  you may do so using
> > .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
> > failed: Cannot provide a coder for a Beam Row. Please provide a schema
> > instead using PCollection.setRowSchema. [ERROR]   Using the default
> > output Coder from the producing PTransform failed:
> > PTransform.getOutputCoder called.
>
> I've followed the advice on the aforementioned JIRA ticket, plus a bunch
> of other examples I found online, but without much joy.  I've tried
> applying `setCoder(SerializableCoder.of(Row.class))` after the
> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though
> I'm not yet sure if it's just suppressed the error, or if it's actually
> working), but that changes causes another runtime error:
>
> > [ERROR] Failed to execute goal
> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> > project dataflow-example: An exception occured while executing the
> > Java class. Cannot call getSchema when there is no schema -> [Help 1]
> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
> > (default-cli) on project dataflow-example: An exception occured while
> > executing the Java class. Cannot call getSchema when there is no
> > schema
>
> This error is thrown further down my pipeline, when I perform a subsequent
> `SqlTransform.query` to JOIN some results together.
>
>     PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
>                     .and(new TupleTag<Row>("Experiments"), experiments)
>                         .apply("Joining Aggregates to Experiments",
> SqlTransform.query(aggregateExperimentJoin()))
>                         .apply(ParDo.of(new
> MapBeamRowsToBigQueryTableRows()))
>                         .apply(BigQueryIO.writeTableRows()
>
> .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>
> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>
> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
> String>) projectId -> projectId + ":daily_aggregates.experiments")));
>
> I've verified the `aggregates` collection is indeed missing a schema if I
> interrogate the `hasSchema` property.  The second `experiments` PCollection
> above does have a row schema set though:
>
>     PCollection<Row> rawExperiments = rows.apply(
>         SqlTransform.query("select sessionId, experiments from
> PCOLLECTION")
>     );
>     PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
> CustomFunctions.ParseExperiments(bigQuerySchema)));
>     experiments.setRowSchema(bigQuerySchema);
>
> I've also tried applying this coder at the pipeline level, with different
> variations on the following.  But this also gives the same error:
>
>     CoderRegistry cr = pipeline.getCoderRegistry();
>     cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
>     cr.registerCoderForType(TypeDescriptors.rows(),
> RowCoder.of(bigQuerySchema));
>
> The `bigQuerySchema` object referenced above is the initial schema used to
> retrieve all raw data from BigQuery, though that part of the pipeline works
> fine, so potentially I need to pass the `aggregatesSchema` object (see
> below) in to `registerCoderForType` for the pipeline?
>
> I then tried to set the row schema on `aggregates` (which was another
> suggestion in the error above).  I've confirmed that calling `setCoder` is
> responsible for the previous `Row` schema disappearing, where it had
> previously been set by the input PCollection (and also if I call
> `setRowSchema` immediately before I call the `DoFn`.
>
> I've simplified the schema for succinctness in this post, but it's a
> subset of `bigQuerySchema` with a few new fields (simple data types).
> Here's what I've tried, again with various combinations of where I call
> `setCoder` and `setRowSchema` (before `apply()` and/or after).
>
>     Schema aggregatesSchema = Schema.builder()
>         .addNullableField("userId", FieldType.STRING)
>         .addNullableField("sessionId", FieldType.STRING)
>         .addNullableField("experimentsPerDay", FieldType.INT64)
>         .build();
>
>     PCollection<Row> aggregates = inputCollection.apply(
>         "Generate Aggregates",
>
> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>     )
>     .apply(ParDo.of(new HandleNullValues()))
>     .setCoder(SerializableCoder.of(Row.class))
>     .setRowSchema(aggregatesSchema);
>
> Unfortunately, this causes a third runtime error which I've not been able
> to figure out:
>
> > [ERROR] Failed to execute goal
> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> > project dataflow-example: An exception occured while executing the
> > Java class. java.lang.IllegalStateException -> [Help 1]
> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
> > (default-cli) on project dataflow-example: An exception occured while
> > executing the Java class. java.lang.IllegalStateException
>
> The full call stack is at the bottom of this email, and I can see it
> originating from my `HandleNullValues` `DoFn`, but after that it disappears
> into the Beam libraries.
>
> I'm at a loss as to which route is recommended, and how to proceed, as
> both coder and schema options are causing different issues.
>
> Any help would be greatly appreciated, and thanks for your efforts on this
> project!
>
>
> The full `DoFn` I've referred to is further below, but it's worth noting
> that just having an essentially empty `DoFn` with both input and output of
> Beam `Row` types causes the same issue:
>
>     public static class HandleNullValues extends DoFn<Row, Row> {
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             Row row = c.element();
>             c.output(row);
>         }
>     }
>
> Here's the full implementation, if anyone can think of a better way to
> detect and replace `NULL` values returned from Beam SQL:
>
>     public static class HandleNullValues extends DoFn<Row, Row> {
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             Row row = c.element();
>             List<String> fields = row.getSchema().getFieldNames();
>             Builder rowBuilder = Row.withSchema(row.getSchema());
>
>             for (String f: fields) {
>                 Object value = row.getValue(f);
>                 if (value != null && value instanceof Long) {
>                     Long longVal = row.getInt64(f);
>                     if (longVal == Long.MAX_VALUE || longVal ==
> Long.MIN_VALUE) {
>                         rowBuilder.addValue(null);
>                     } else {
>                         rowBuilder.addValue(value);
>                     }
>                 } else if (value != null && value instanceof Double) {
>                     Double doubleVal = row.getDouble(f);
>                     if (doubleVal == Double.MAX_VALUE || doubleVal ==
> Double.MIN_VALUE) {
>                         rowBuilder.addValue(null);
>                     } else {
>                         rowBuilder.addValue(value);
>                     }
>                 } else {
>                     rowBuilder.addValue(value);
>                 }
>             }
>
>             Row newRow = rowBuilder.build();
>             c.output(newRow);
>         }
>     }
>
> And here's the full callstack from the `setRowSchema` issue detailed above:
>
>
> > [ERROR] Failed to execute goal
> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> > project dataflow-example: An exception occured while executing the
> > Java class. java.lang.IllegalStateException -> [Help 1]
> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
> > (default-cli) on project dataflow-example: An exception occured while
> > executing the Java class. java.lang.IllegalStateException
> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
> (MojoExecutor.java:306)
> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
> (MojoExecutor.java:211)
> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
> (MojoExecutor.java:165)
> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
> (MojoExecutor.java:157)
> >     at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> > (LifecycleModuleBuilder.java:121)
> >     at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> > (LifecycleModuleBuilder.java:81)
> >     at
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
> > (SingleThreadedBuilder.java:56)
> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
> (LifecycleStarter.java:127)
> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
> >     at sun.reflect.NativeMethodAccessorImpl.invoke
> (NativeMethodAccessorImpl.java:62)
> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
> (DelegatingMethodAccessorImpl.java:43)
> >     at java.lang.reflect.Method.invoke (Method.java:498)
> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
> > (Launcher.java:282)
> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
> (Launcher.java:225)
> >     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
> > (Launcher.java:406)
> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
> (Launcher.java:347) Caused by:
> > org.apache.maven.plugin.MojoExecutionException: An exception occured
> > while executing the Java class. java.lang.IllegalStateException
> >     at org.codehaus.mojo.exec.ExecJavaMojo.execute
> (ExecJavaMojo.java:311)
> >     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
> (DefaultBuildPluginManager.java:137)
> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
> (MojoExecutor.java:301)
> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
> (MojoExecutor.java:211)
> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
> (MojoExecutor.java:165)
> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
> (MojoExecutor.java:157)
> >     at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> > (LifecycleModuleBuilder.java:121)
> >     at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> > (LifecycleModuleBuilder.java:81)
> >     at
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
> > (SingleThreadedBuilder.java:56)
> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
> (LifecycleStarter.java:127)
> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
> >     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
> >     at sun.reflect.NativeMethodAccessorImpl.invoke
> (NativeMethodAccessorImpl.java:62)
> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
> (DelegatingMethodAccessorImpl.java:43)
> >     at java.lang.reflect.Method.invoke (Method.java:498)
> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
> > (Launcher.java:282)
> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
> (Launcher.java:225)
> >     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
> > (Launcher.java:406)
> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
> (Launcher.java:347) Caused by:
> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > java.lang.IllegalStateException
> >     at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> > (DirectRunner.java:373)
> >     at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> > (DirectRunner.java:341)
> >     at org.apache.beam.runners.direct.DirectRunner.run
> (DirectRunner.java:218)
> >     at org.apache.beam.runners.direct.DirectRunner.run
> (DirectRunner.java:67)
> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
> >     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
> >     at java.lang.Thread.run (Thread.java:748) Caused by:
> java.lang.IllegalStateException
> >     at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
> > (Preconditions.java:491)
> >     at
> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
> > (RowCoderGenerator.java:314)
> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
> (Unknown Source)
> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
> (Unknown Source)
> >     at org.apache.beam.sdk.schemas.SchemaCoder.encode
> (SchemaCoder.java:124)
> >     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
> >     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
> (CoderUtils.java:85)
> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
> (CoderUtils.java:69)
> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
> (CoderUtils.java:54)
> >     at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
> >     at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
> > (MutationDetectors.java:118)
> >     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
> (MutationDetectors.java:49)
> >     at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
> > (ImmutabilityCheckingBundleFactory.java:115)
> >     at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
> > (ParDoEvaluator.java:305)
> >     at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
> > (SimpleDoFnRunner.java:268)
> >     at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
> > (SimpleDoFnRunner.java:84)
> >     at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
> > (SimpleDoFnRunner.java:416)
> >     at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
> > (SimpleDoFnRunner.java:404)
> >     at com.example.dataflow.Pipeline$HandleNullValues.processElement
> (CustomFunctions.java:310)
>
>
> Cheers!
>