You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 22:08:15 UTC

[GitHub] [beam] kennknowles opened a new issue, #19066: Use Beam schema in ParquetIO

kennknowles opened a new issue, #19066:
URL: https://github.com/apache/beam/issues/19066

   It would be better if we eliminate the need for avro.schema (infer it?/obtain it from PCollection?) and use org.apache.beam.sdk.schemas.Schema instead. 
   
   Link to discussion on user@: [https://lists.apache.org/thread.html/1d270884aa9e6d7952857](https://lists.apache.org/thread.html/1d270884aa9e6d7952857203522f67dea22195edca631df33f7c054d@%3Cuser.beam.apache.org%3E%C2%A0)
   
   Imported from Jira [BEAM-4812](https://issues.apache.org/jira/browse/BEAM-4812). Original Jira may contain additional context.
   Reported by: ŁukaszG.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Use Beam schema in ParquetIO [beam]

Posted by "psolomin (via GitHub)" <gi...@apache.org>.
psolomin commented on issue #19066:
URL: https://github.com/apache/beam/issues/19066#issuecomment-1752180280

   I would like to ask a follow-up question on this: `ParquetIO` being hard-coded to `GenericRecord`, seems to be practically unusable for logical types cause I did not find a coder for it which actually supports Avro logical types. Am I mistaken?
   
   Related issue: https://github.com/apache/beam/issues/18874
   Related PR which adds support for logical types: https://github.com/apache/beam/pull/26320
   
   Here's an example of a pipeline which reads Avro and writes Parquet files:
   
   ```
   ... reading byte arrays from source ...
   
   .apply("Parse payloads", ParDo.of(new ConsumedEventDeserializer()))
   .setCoder(AvroCoder.of(GenericRecord.class, ConsumedEvent.SCHEMA$))
   .apply(
           "Sink to S3",
           FileIO.<GenericRecord>write()
                   .via(ParquetIO.sink(ConsumedEvent.SCHEMA$)
                           .withCompressionCodec(CompressionCodecName.SNAPPY))
                   .to(opts.getSinkLocation())
                   .withNaming(new NoColonFileNaming(runId)));
   ```
   
   But it produces an exception:
   
   ```
   Caused by: java.lang.ClassCastException: class java.time.Instant cannot be cast to class java.lang.Number (java.time.Instant and java.lang.Number are in module java.base of loader 'bootstrap')
   	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:160)
   	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:81)
   	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:221)
   	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
   	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
   	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
   	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
   	at org.apache.beam.sdk.extensions.avro.coders.AvroCoder.encode(AvroCoder.java:378)
   	at org.apache.beam.sdk.coders.Coder.encode(Coder.java:132)
   	at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:86)
   	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:70)
   	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:55)
   	at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:168)
   	at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
   	at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
   	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
   	at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
   	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
   	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
   	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
   	at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
   	at com.psolomin.consumer.ConsumedEventDeserializer.processElement(ConsumedEventDeserializer.java:34)
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org