You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Léopold Boudard (Jira)" <ji...@apache.org> on 2020/04/01 18:20:00 UTC

[jira] [Commented] (BEAM-9656) Reading from pubsub in portable FlinkRunner

    [ https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073061#comment-17073061 ] 

Léopold Boudard commented on BEAM-9656:
---------------------------------------

I've seen very similar issues, 

[http://mail-archives.apache.org/mod_mbox/beam-dev/201909.mbox/%3Ccceb21da-13df-4bf5-b8f3-2b78bdf598ec@apache.org%3E]

hence I guess there might be a problem reading from pubsub from the job server (since the job is never submitted to the cluster it seems), though I don't really know how I coud fix this, since I already pass GOOGLE_APPLICATION_CREDENTIALS with proper rights I guess my problem comes from elsewhere.

> Reading from pubsub in portable FlinkRunner
> -------------------------------------------
>
>                 Key: BEAM-9656
>                 URL: https://issues.apache.org/jira/browse/BEAM-9656
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.19.0
>         Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
>            Reporter: Léopold Boudard
>            Priority: Major
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize UnboundedSource at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) at org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120) at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113) at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84) at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>  at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:844)Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
>     bounds_to_get = (
>             p | 'LoadJson' >> beam.io.ReadFromPubSub(
>                     topic=known_args.input_topic
>             )
>             | beam.Map(lambda x: json.loads(x))
>     )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={} --flink_version 1.9  --output gs://... --input_topic projects/pubsub-public-data/topics/taxirides-realtime  --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work. I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)