You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 20:50:33 UTC

[GitHub] [beam] damccorm opened a new issue, #20978: PubsubIO.readAvroGenericRecord creates SchemaCoder that fails to decode some Avro logical types

damccorm opened a new issue, #20978:
URL: https://github.com/apache/beam/issues/20978

   For example, when PubsubIO.readAvroGenericRecord is used with an avro schema that includes a decimal type with a non-zero scale, it can encounter the following exception when decoding:
   
   
   ```
   
   org.apache.avro.AvroTypeException: Cannot encode decimal with scale 17 as scale 0
   	at org.apache.avro.Conversions$DecimalConversion.toBytes(Conversions.java:92)
   	at
   org.apache.beam.sdk.schemas.utils.AvroUtils.genericFromBeamField(AvroUtils.java:975)
   	at org.apache.beam.sdk.schemas.utils.AvroUtils.toGenericRecord(AvroUtils.java:397)
   	at
   org.apache.beam.sdk.schemas.utils.AvroUtils$RowToGenericRecordFn.apply(AvroUtils.java:547)
   	at org.apache.beam.sdk.schemas.utils.AvroUtils$RowToGenericRecordFn.apply(AvroUtils.java:538)
   	at
   org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:123)
   	at org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.decode(TableRowInfoCoder.java:64)
   	at
   org.apache.beam.sdk.io.gcp.bigquery.TableRowInfoCoder.decode(TableRowInfoCoder.java:30)
   	at org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:112)
   	at
   org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
   	at
   org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
   	at
   org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56)
   	at
   org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39)
   	at
   org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
   	at
   org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
   	at
   org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
   	at
   org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
   	at
   org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
   	at
   org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
   	at
   org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
   	at
   org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
   	at
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426)
   	at
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:163)
   	at
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1105)
   	at
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at
   java.lang.Thread.run(Thread.java:748) 
   ```
   
   
   Imported from Jira [BEAM-12256](https://issues.apache.org/jira/browse/BEAM-12256). Original Jira may contain additional context.
   Reported by: bhulette.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on issue #20978: PubsubIO.readAvroGenericRecord creates SchemaCoder that fails to decode some Avro logical types

Posted by GitBox <gi...@apache.org>.
scwhittle commented on issue #20978:
URL: https://github.com/apache/beam/issues/20978#issuecomment-1248002484

   IF you have a repro you can disable the pubsub override to verify by setting enable_custom_pubsub_source experiment
   https://github.com/xsm110/Apache-Beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L315


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #20978: PubsubIO.readAvroGenericRecord creates SchemaCoder that fails to decode some Avro logical types

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #20978:
URL: https://github.com/apache/beam/issues/20978#issuecomment-1246029781

   FYI @johnjcasey. I think this is fine to be a P2 feature request, really, to make this functionality usable. As long as we fail rather than corrupt data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on issue #20978: PubsubIO.readAvroGenericRecord creates SchemaCoder that fails to decode some Avro logical types

Posted by GitBox <gi...@apache.org>.
scwhittle commented on issue #20978:
URL: https://github.com/apache/beam/issues/20978#issuecomment-1248000367

   I don't think so, internal impl just replaces PubsubUnboundedSource which is before the ParseFn.
   I'm not even sure if this bug is really that pubsub specific or just a coder issue with SchemaCoder/AvroCoder compatibility.
   
   Unless the bug is just that PubsubIO is doing somethign wrong with the implementation of that method
   
       Schema schema = AvroUtils.getSchema(GenericRecord.class, avroSchema);
       AvroCoder<GenericRecord> coder = AvroCoder.of(GenericRecord.class, avroSchema);
       return Read.newBuilder(parsePayloadUsingCoder(coder))
           .setCoder(
               SchemaCoder.of(
                   schema,
                   TypeDescriptor.of(GenericRecord.class),
                   AvroUtils.getToRowFunction(GenericRecord.class, avroSchema),
                   AvroUtils.getFromRowFunction(GenericRecord.class)))
           .build();
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #20978: PubsubIO.readAvroGenericRecord creates SchemaCoder that fails to decode some Avro logical types

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on issue #20978:
URL: https://github.com/apache/beam/issues/20978#issuecomment-1246881617

   @scwhittle are you able to identify if this is an issue with all pubsub io, or the google internal pubsub io?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org