You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pradip Thachile <pr...@thachile.com> on 2020/06/08 20:11:13 UTC

Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source

Hey folks, 

I posted this on the Flink user mailing list but didn't get any traction there (potentially since this is Beam related?). I've got a Beam/Python pipeline that works on the DirectRunner and now am trying to run this on a local dev Flink cluster. Running this yields an error out the gate around not being able to deserialize UnboundedSource (my PubSub source). I'm not sure how to debug this and would love to get some feedback on how to solve this issue. I'm also adding in a simple example that reproduces this error.

Beam SDK: 2.19
Flink: 1.9.3
Python: 3.7
Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', '--flink_submit_uber_jar', '--streaming']
(Stacktrace below)

#!/usr/bin/env python3
import apache_beam as beam

class DummyPipeline(beam.PTransform):
    def expand(self, p):
        (
            p
            | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
                topic="<valid topic>")
            | beam.Map(print)
        )

        return p

def main():
    beam_options = [
        # "--runner=DirectRunner",
        "--runner=FlinkRunner",
        "--flink_version=1.9",
        "--flink_submit_uber_jar",
        "--streaming",
        '--save_main_session',
    ]
    popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
    p = beam.Pipeline(options=popts)

    (
        p
        | "Do It" >> DummyPipeline()
    )
    job = p.run()
    job.wait_until_finish()

if __name__ == "__main__":
    main()

-Pradip

[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - ArtifactStagingService started on localhost:55371
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java ExpansionService started on localhost:55372
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService started on localhost:55364
[grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f with pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
[grpc-default-executor-0] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job invocation BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
[flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to Flink program.
[flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming Environment.
[flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f.
java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
    at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
    at org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
    at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
    at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
    at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
    at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
    at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
    at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
    at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
    at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
    at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
    at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
    at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
    at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
    at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
    at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
    at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
    ... 14 more
ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
[flink-runner-job-invoker] WARN org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - Failed to remove job staging directory for token {"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}: {}
java.io.FileNotFoundException: /var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST (No such file or directory)
    at java.io.FileInputStream.open0(Native Method)
    at java.io.FileInputStream.open(FileInputStream.java:195)
    at java.io.FileInputStream.<init>(FileInputStream.java:138)
    at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
    at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
    at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
    at org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88)
    at org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92)
    at org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63)
    at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201)
    at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247)
    at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48)
    at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "bin/run-pipeline.py", line 70, in <module>
    main()
  File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "bin/run-pipeline.py", line 64, in main
    job = runner.run(pipeline=pipeline)
  File "/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py", line 42, in run
    result = dag.run()
  File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run
    return self.runner.run_pipeline(self, self._options)
  File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py", line 47, in run_pipeline
    return super(FlinkRunner, self).run_pipeline(pipeline, options)
  File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 334, in run_pipeline
    result.wait_until_finish()
  File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 455, in wait_until_finish
    self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5)


Re: Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source

Posted by Maximilian Michels <mx...@apache.org>.
You are using a proprietary connector which only works on Dataflow. You
will have to use io.external.gcp.pubsub.ReadFromPubsub. PubSub support
is experimental from Python.

-Max

On 09.06.20 06:40, Pradip Thachile wrote:
> Quick update: this test code works just fine on Dataflow as well as the DirectRunner. Looks like the FlinkRunner is problematic for some reason here.
> 
> On 2020/06/08 20:11:13, Pradip Thachile <pr...@thachile.com> wrote: 
>> Hey folks, 
>>
>> I posted this on the Flink user mailing list but didn't get any traction there (potentially since this is Beam related?). I've got a Beam/Python pipeline that works on the DirectRunner and now am trying to run this on a local dev Flink cluster. Running this yields an error out the gate around not being able to deserialize UnboundedSource (my PubSub source). I'm not sure how to debug this and would love to get some feedback on how to solve this issue. I'm also adding in a simple example that reproduces this error.
>>
>> Beam SDK: 2.19
>> Flink: 1.9.3
>> Python: 3.7
>> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', '--flink_submit_uber_jar', '--streaming']
>> (Stacktrace below)
>>
>> #!/usr/bin/env python3
>> import apache_beam as beam
>>
>> class DummyPipeline(beam.PTransform):
>>     def expand(self, p):
>>         (
>>             p
>>             | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
>>                 topic="<valid topic>")
>>             | beam.Map(print)
>>         )
>>
>>         return p
>>
>> def main():
>>     beam_options = [
>>         # "--runner=DirectRunner",
>>         "--runner=FlinkRunner",
>>         "--flink_version=1.9",
>>         "--flink_submit_uber_jar",
>>         "--streaming",
>>         '--save_main_session',
>>     ]
>>     popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
>>     p = beam.Pipeline(options=popts)
>>
>>     (
>>         p
>>         | "Do It" >> DummyPipeline()
>>     )
>>     job = p.run()
>>     job.wait_until_finish()
>>
>> if __name__ == "__main__":
>>     main()
>>
>> -Pradip
>>
>> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - ArtifactStagingService started on localhost:55371
>> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java ExpansionService started on localhost:55372
>> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService started on localhost:55364
>> [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f with pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
>> [grpc-default-executor-0] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job invocation BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
>> [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to Flink program.
>> [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming Environment.
>> [flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f.
>> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
>>     at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>     at org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>>     at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>>     at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>>     at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>>     at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>>     at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>>     at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>>     at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>>     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>>     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>>     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>>     at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>>     at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>>     at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>>     at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>>     at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>>     at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>>     at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>>     at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>>     ... 14 more
>> ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>> [flink-runner-job-invoker] WARN org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - Failed to remove job staging directory for token {"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}: {}
>> java.io.FileNotFoundException: /var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST (No such file or directory)
>>     at java.io.FileInputStream.open0(Native Method)
>>     at java.io.FileInputStream.open(FileInputStream.java:195)
>>     at java.io.FileInputStream.<init>(FileInputStream.java:138)
>>     at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
>>     at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
>>     at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>     at org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88)
>>     at org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92)
>>     at org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63)
>>     at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201)
>>     at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247)
>>     at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48)
>>     at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151)
>>     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Traceback (most recent call last):
>>   File "bin/run-pipeline.py", line 70, in <module>
>>     main()
>>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 829, in __call__
>>     return self.main(*args, **kwargs)
>>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 782, in main
>>     rv = self.invoke(ctx)
>>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
>>     return ctx.invoke(self.callback, **ctx.params)
>>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 610, in invoke
>>     return callback(*args, **kwargs)
>>   File "bin/run-pipeline.py", line 64, in main
>>     job = runner.run(pipeline=pipeline)
>>   File "/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py", line 42, in run
>>     result = dag.run()
>>   File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run
>>     return self.runner.run_pipeline(self, self._options)
>>   File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py", line 47, in run_pipeline
>>     return super(FlinkRunner, self).run_pipeline(pipeline, options)
>>   File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 334, in run_pipeline
>>     result.wait_until_finish()
>>   File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 455, in wait_until_finish
>>     self._job_id, self._state, self._last_error_message()))
>> RuntimeError: Pipeline BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>>
>>

Re: Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source

Posted by Pradip Thachile <pr...@thachile.com>.
Quick update: this test code works just fine on Dataflow as well as the DirectRunner. Looks like the FlinkRunner is problematic for some reason here.

On 2020/06/08 20:11:13, Pradip Thachile <pr...@thachile.com> wrote: 
> Hey folks, 
> 
> I posted this on the Flink user mailing list but didn't get any traction there (potentially since this is Beam related?). I've got a Beam/Python pipeline that works on the DirectRunner and now am trying to run this on a local dev Flink cluster. Running this yields an error out the gate around not being able to deserialize UnboundedSource (my PubSub source). I'm not sure how to debug this and would love to get some feedback on how to solve this issue. I'm also adding in a simple example that reproduces this error.
> 
> Beam SDK: 2.19
> Flink: 1.9.3
> Python: 3.7
> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', '--flink_submit_uber_jar', '--streaming']
> (Stacktrace below)
> 
> #!/usr/bin/env python3
> import apache_beam as beam
> 
> class DummyPipeline(beam.PTransform):
>     def expand(self, p):
>         (
>             p
>             | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
>                 topic="<valid topic>")
>             | beam.Map(print)
>         )
> 
>         return p
> 
> def main():
>     beam_options = [
>         # "--runner=DirectRunner",
>         "--runner=FlinkRunner",
>         "--flink_version=1.9",
>         "--flink_submit_uber_jar",
>         "--streaming",
>         '--save_main_session',
>     ]
>     popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
>     p = beam.Pipeline(options=popts)
> 
>     (
>         p
>         | "Do It" >> DummyPipeline()
>     )
>     job = p.run()
>     job.wait_until_finish()
> 
> if __name__ == "__main__":
>     main()
> 
> -Pradip
> 
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - ArtifactStagingService started on localhost:55371
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java ExpansionService started on localhost:55372
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService started on localhost:55364
> [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f with pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
> [grpc-default-executor-0] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job invocation BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
> [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to Flink program.
> [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming Environment.
> [flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f.
> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
>     at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>     at org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>     at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>     at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>     at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>     at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>     at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>     at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>     at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>     at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>     at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>     at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>     at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>     at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>     at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>     at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>     at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>     ... 14 more
> ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> [flink-runner-job-invoker] WARN org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - Failed to remove job staging directory for token {"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}: {}
> java.io.FileNotFoundException: /var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST (No such file or directory)
>     at java.io.FileInputStream.open0(Native Method)
>     at java.io.FileInputStream.open(FileInputStream.java:195)
>     at java.io.FileInputStream.<init>(FileInputStream.java:138)
>     at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
>     at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
>     at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>     at org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88)
>     at org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92)
>     at org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63)
>     at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201)
>     at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247)
>     at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48)
>     at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151)
>     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Traceback (most recent call last):
>   File "bin/run-pipeline.py", line 70, in <module>
>     main()
>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 829, in __call__
>     return self.main(*args, **kwargs)
>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 782, in main
>     rv = self.invoke(ctx)
>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
>     return ctx.invoke(self.callback, **ctx.params)
>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 610, in invoke
>     return callback(*args, **kwargs)
>   File "bin/run-pipeline.py", line 64, in main
>     job = runner.run(pipeline=pipeline)
>   File "/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py", line 42, in run
>     result = dag.run()
>   File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py", line 47, in run_pipeline
>     return super(FlinkRunner, self).run_pipeline(pipeline, options)
>   File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 334, in run_pipeline
>     result.wait_until_finish()
>   File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 455, in wait_until_finish
>     self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> 
>