You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "piter75 (via GitHub)" <gi...@apache.org> on 2023/04/18 16:14:24 UTC

[GitHub] [beam] piter75 opened a new issue, #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

piter75 opened a new issue, #26329:
URL: https://github.com/apache/beam/issues/26329

   ### What happened?
   
   Since #22718 our Spotify's Scio based streaming pipelines on Google Cloud Dataflow are failing with the `AvroCodec` exception while reading data from BigQuery (with TypedRead).
   The last released version of Beam that works properly is 2.42.0 and we cannot upgrade some of our pipelines further because of the issue.
   
   We are reading `GenericRecords` from temporary BigQuery table and apply `parseFn` function to it to create arbitrary (non-avro) types which is effectively [the Case 3 from the table](https://github.com/apache/beam/blob/86c1ba09761bc2734ae3e666ea8f804405e015eb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java#L149-L159) described in the `AvroSource.Mode`.
   
   ```java
   // pseudo code
   
   BigQueryIO
           .read(SerializableFunction[SchemaAndRecord, T] parseFn)
           .withCoder(Coder[T] coder)
   ```
   
   I analysed the issue. It is complex but the gist of it is that:
   
   1. #22718 adds the ability to use a custom `AvroSource.DatumReaderFactory` implementation for reading from BigQuery;
   2. it creates its "default" / "backwards compatibility" [implementation](https://github.com/apache/beam/blob/86c1ba09761bc2734ae3e666ea8f804405e015eb/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L604-L637) and [uses it](https://github.com/apache/beam/blob/86c1ba09761bc2734ae3e666ea8f804405e015eb/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L664-L678) in `BigQueryIO.read`;
   3. this "default" implementation is in fact [using the `parseFn` function](https://github.com/apache/beam/blob/86c1ba09761bc2734ae3e666ea8f804405e015eb/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L635) (supplied to `BigQueryIO.read`) to actually return the parsed type from custom `DatumReader`;
   4. however, it does not (and cannot) propagate the output `Coder` to the `AvroSource` used for reading the data;
   5. Dataflow (in the streaming mode) is wrapping the `AvroSource` in `UnboundedReadFromBoundedSource` wrapper to use it as `UnboundedSource`;
   6. on the way it tries to get the output `Coder` from the underlying `AvroSource` to use it as `CheckpointCoder` for checkpointing;
   7. `AvroSource` does not have a clue about `parseFn` [being actually used and it returns](https://github.com/apache/beam/blob/86c1ba09761bc2734ae3e666ea8f804405e015eb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java#L191-L192) the `AvroCoder` instance which of course cannot `encode` arbitrary (non-avro) types
   
   The biggest issue I see is that the contract between using `parseFn` in the process and supplying the output `Coder` [that `AvroSource` enforces](https://github.com/apache/beam/blob/86c1ba09761bc2734ae3e666ea8f804405e015eb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java#L312-L313) is broken by moving the responsibility of applying the `parseFn` into `GenericDatumTransformer`.
   
   I am thinking about contributing a fix and I am pondering on the following solution:
   
   1. removal of the `BigQueryIO.GenericDatumTransformer`
   2. bringing back the `parseFn` to `BigQueryBaseSource` hierarchy
   3. simplifying the `datumReaderFactory` type to `AvroSource.DatumReaderFactory<T>` and stop applying `parseFn` in it
   4. adding validation that only one of `parseFn` or `datumReaderFactory` is used - I believe that the purpose of custom `DatumReader` is to actually read `SpecificRecord`s and output them without the need for additional parsing.
   5. creating `AvroSource`s accordingly to which param was actually provided [in `BigQuerySourceBase.createSources`](https://github.com/apache/beam/blob/86c1ba09761bc2734ae3e666ea8f804405e015eb/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L237-L263)
   
   This will of course add more complexity to the already complex process but will keep the backwards compatibility in more scenarios.
   
   CC: @steveniemitz @kkdoon
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1677989059

   R: @ahmedabu98 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "kkdoon (via GitHub)" <gi...@apache.org>.
kkdoon commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1533650539

   @kennknowles yeah correct, #23594 was in response to the comment and the issue [#23541](https://github.com/apache/beam/issues/23541). 
   
   @piter75  we are using dataflow runner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1677562012

   @kennknowles since you were involved in the conversation, can you have a look to the associated PR ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] piter75 commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "piter75 (via GitHub)" <gi...@apache.org>.
piter75 commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1666236324

   @RustedBones We did introduce a workaround in our pipelines, and then I got caught up by some other tasks :(
   I  don't foresee having time to address this issue in the next 2-3 weeks.
   I would be greatful If you can take over.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "kkdoon (via GitHub)" <gi...@apache.org>.
kkdoon commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1515926101

   Hi @piter75,
   
   Thanks for the detailed explanation. Is there a way to reproduce the bug via a unit-test? Also, curious that how is your streaming job reading from a bounded source. Would you be able to share more details of your job graph? 
   
   I can also take a look at the issue and try to debug it at my end. But overall your approach makes sense to me.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1533504134

   Was the comment at https://github.com/apache/beam/pull/22718#issuecomment-1275105750 addressed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] piter75 commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "piter75 (via GitHub)" <gi...@apache.org>.
piter75 commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1517629623

   Thanks for responding @kkdoon.
   
   Our use case for `BoundedSource` in streaming job stems from the fact that we want to have a hot start whenever we start/re-start the job.
   
   Part of our pipeline depends on the join between streams with different event frequency. On one side there is a stream that delivers thousands of messages per second and on the other is one that may not have matching messages for a day.
   We solved this frequency mismatch by loading the history of the "slow moving" stream from BigQuery and then making a union with a stream of messages that come straight from the PubSub topic.
   This way we have a union that is both complete from the start and still unbounded during the pipeline run.
   
   Unit testing this issue may be difficult because it is triggered when used on a specific runner in specific run mode and I wouldn't like to test the behaviour of the BigQuery source with the runner tests.
   
   I will try to come up with some reproduction example.
   
   What runner are you using by the way?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1533505943

   It looks like #23594 was a response to it?
   
   CC @steveniemitz 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1677988906

   R: @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on issue #26329: [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on issue #26329:
URL: https://github.com/apache/beam/issues/26329#issuecomment-1665376671

   @piter75 are you still working on that? If not I can give it a shot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org