You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Janek Bevendorff <ja...@uni-weimar.de> on 2021/12/02 13:22:59 UTC

Re: Deployment difficulties with Python apps on Kubernetes Flink cluster

Hi,

I know that there are various options for configuring Flink using Beam 
with the Java SDK, but are there any options to do the same with the 
Python SDK? The FlinkRunnerOptions class offers only a fraction of what 
the Java FlinkPipelineOptions class provides. I would like to be able to 
set the parallelism when I submit an Uber JAR and I would also like to 
be able to set the task retry count.


On 30/11/2021 17:49, Janek Bevendorff wrote:
> Again, one step closer to getting this thing running:
>
> I have to set --artifact_endpoint as well to submit a job to a remote 
> job server, not just --job_endpoint. Would be great if the docs at 
> least mentioned that.
>
> I also don't understand really why there is no single option for 
> setting both the job and artifact endpoint address (without the port 
> number). They must both run in the same container, otherwise I get 
> errors about invalid job staging IDs, so having two options is kind of 
> redundant configuration.
>
> Janek
>
>
> On 30/11/2021 16:08, Janek Bevendorff wrote:
>> Any ideas here? I am a few steps further now, but not quite there yet.
>>
>> The main deployment issue can be solved by sharing only 
>> /tmp/beam-artifact-staging among job and taskmanagers, not the whole 
>> tmp directory (this is knowledge from countless hours of googling and 
>> trying things out, no documentation here whatsover). It does not 
>> solve the remote deployment issue, but so far I am at least able to 
>> submit jobs with a locally running Beam job server.
>>
>> Unfortunately, I am getting random gRPC terminations with this 
>> exception:
>>
>>   File "venv/lib/python3.7/site-packages/apache_beam/pipeline.py", 
>> line 597, in __exit__
>>     self.result.wait_until_finish()
>>   File 
>> "venv/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", 
>> line 600, in wait_until_finish
>>     raise self._runtime_exception
>> RuntimeError: Pipeline 
>> BeamApp-root-1130144051-d0476877_4d32d3e8-1fbb-4f2d-88d9-c2e05fd624eb 
>> failed in state FAILED: 
>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException: 
>> CANCELLED: client cancelled
>>
>> The error occurs randomly without any indication why. Do you have any 
>> idea what may be wrong with the gRPC connection or also what I may be 
>> missing for the remote job server deployment?
>>
>>
>> Besides these questions, I also have a little rant (sorry for that, 
>> but I have to get this off my chest):
>>
>> I am getting a extremely frustrated with the Python documentation, 
>> which is often incomplete, sometimes outdated and occasionally plain 
>> wrong. I can tell that many examples were never actually run, because 
>> they contain invalid Python code, function names differ from what the 
>> API actually offers, parentheses are in the wrong places etc. One 
>> particular example is the splittable DoFn documentation. The original 
>> blog post is entirely outdated and also contains invalid Python code 
>> (missing self parameters of methods and such), but also the online 
>> manual is wrong (missing constructor parameters or required method 
>> overrides here, wrong parentheses there...). To understand how 
>> everything works, I am basically reverse engineering the code, taking 
>> into account the little API documentation there is. This is beyond 
>> annoying.
>>
>> I also noticed that stateful processing is completely broken 
>> apparently. With the local runner, it doesn't run at all (various 
>> exceptions thrown) and with the FlinkRunner or PortableRunner, 
>> BagStateSpecs or other state parameters are always empty and 
>> watermark timers fire after each invocation of process(). I tried 
>> countless potential solutions, but nothing works, so I gave up and 
>> resorted to using a CombineFn-based PTransform instead.
>>
>> Janek
>>
>>
>> On 26/11/2021 17:01, Janek Bevendorff wrote:
>>> I am one step further, but also not really.
>>>
>>> When I mount the shared drive that serves /tmp on the Flink job and 
>>> task managers also on my local machine and then spin up a local Beam 
>>> job server with this volume mounted on /tmp as well, I can get my 
>>> job to start. This is ugly as hell, because it requires so many 
>>> extra steps, but at least it's progress.
>>>
>>> Unfortunately, the job doesn't run properly and fails with
>>>
>>>   File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 
>>> 462, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> ModuleNotFoundError: No module named 'XXXX'
>>>
>>> where XXX is the my application module that I deploy with 
>>> --setup_file. When I download the workflow.tar.gz from the staging 
>>> directory, I can confirm that the module is present.
>>>
>>> This isn't working as intended at all. Also what happens if multiple 
>>> users submit applications at the same time? All the Beam stuff in 
>>> /tmp has random names, but the stages/workflow.tar.gz file that is 
>>> provided for the Python SDK sidecar container has the same name for 
>>> each job. Hence it would be impossible to serve multiple users with 
>>> this setup.
>>>
>>> Janek
>>>
>>>
>>> On 26/11/2021 15:32, Janek Bevendorff wrote:
>>>> Hi,
>>>>
>>>> Currently, I am struggling with getting Beam to run on a 
>>>> Kubernetes-hosted Flink cluster and there is very little to no 
>>>> documentation on how to resolve my deployment issues (besides a few 
>>>> Stackoverflow threads without solutions).
>>>>
>>>> I have a Flink job server running on Kubernetes that creates new 
>>>> taskmanager pods from a pod template when I submit a job. Each 
>>>> taskmanager pod has a sidecar container running the Beam Python SDK 
>>>> image.
>>>>
>>>> With this setup in place, I tried multiple methods to submit a 
>>>> Python Beam job and all of them fail for different reasons:
>>>>
>>>> 1) Run my Python job via the FlinkRunner and set 
>>>> --environment_type=EXTERNAL
>>>>
>>>> This works perfectly fine locally, but fails when I set 
>>>> --flink_master to the Kubernetes load balancer IP to submit to the 
>>>> remote Kubernetes cluster. It  allows me to submit the job itself 
>>>> successfully, but not the necessary staging data. The Flink 
>>>> container shows
>>>>
>>>> java.io.FileNotFoundException: 
>>>> /tmp/beam-temp7hxxe2gs/artifacts2liu9b8y/779b17e6efab2bbfcba170762d1096fe2451e0e76c4361af9a68296f23f4a4ec/1-ref_Environment_default_e-workflow.tar.gz 
>>>> (No such file or directory)
>>>>
>>>> and the Python worker shows
>>>>
>>>> 2021/11/26 14:16:24 Failed to retrieve staged files: failed to 
>>>> retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for 
>>>> /tmp/staged/workflow.tar.gz
>>>>         caused by:
>>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for 
>>>> /tmp/staged/workflow.tar.gz
>>>>         caused by:
>>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for 
>>>> /tmp/staged/workflow.tar.gz
>>>>         caused by:
>>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for 
>>>> /tmp/staged/workflow.tar.gz
>>>>         caused by:
>>>> rpc error: code = Unknown desc =
>>>>
>>>> I found a Stackoverflow issue with the exact same issue, but 
>>>> without a solution. The file seems to exist only under /tmp on my 
>>>> local client machine, which is useless.
>>>>
>>>> 2) Submit the job with --flink_submit_uber_jar=True
>>>>
>>>> This will submit the staging information correctly, but I cannot 
>>>> set the amount of parallelism. Instead I get the following warning:
>>>>
>>>> WARNING:apache_beam.options.pipeline_options:Discarding invalid 
>>>> overrides: {'parallelism': 100}
>>>>
>>>> and the job runs with only a single worker (useless as well).
>>>>
>>>> 3) Spawn another job manager sidecar container running the Beam job 
>>>> server and submit via the PortableRunner
>>>>
>>>> This works (somewhat) when I run the job server image locally with 
>>>> --network=host, but I cannot get it to work on Kubernetes. I 
>>>> exposed the ports 8097-8099 on the load balancer IP, but when I 
>>>> submit a job, I only get
>>>>
>>>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous 
>>>> of RPC that terminated with:
>>>>         status = StatusCode.UNAVAILABLE
>>>>         details = "failed to connect to all addresses"
>>>>         debug_error_string = 
>>>> "{"created":"@1637934464.499518882","description":"Failed to pick 
>>>> subchannel","file":"src/core
>>>> /ext/filters/client_channel/client_channel.cc","file_line":3158,"referenced_errors":[{"created":"@1637934464.499518362","de 
>>>>
>>>> scription":"failed to connect to all 
>>>> addresses","file":"src/core/lib/transport/error_utils.cc","file_line":147,"grpc_status
>>>> ":14}]}"
>>>>
>>>> This method also seems to suffer from the same issue as 2) that I 
>>>> am unable to control the amount of parallelism.
>>>>
>>>>
>>>> Is there anything that I am doing fundamentally wrong? I cannot 
>>>> really imagine that it is this difficult to submit a simple Python 
>>>> job to a Beam/Flink cluster.
>>>>
>>>>
>>>> Thanks for any help
>>>> Janek
>>>>
-- 

Bauhaus-Universität Weimar
Bauhausstr. 9a, R308
99423 Weimar, Germany

Phone: +49 3643 58 3577
www.webis.de


Re: Deployment difficulties with Python apps on Kubernetes Flink cluster

Posted by Janek Bevendorff <ja...@uni-weimar.de>.
Once again: one step further.

The "Client cancelled" error seems to stem from a Python interpreter 
crash due to an error in a native library. The crash itself is not 
logged at all, so I only get this totally misleading gRPC error instead. 
Could this get a better error message perhaps?

Another big issue that is still unsolved is the problem that with 
--setup-file, the dependencies get installed globally in the SDK 
container and stay there for all following jobs. That includes the wheel 
built from my local Python module and when I resubmit the job, it 
doesn't get reinstalled, because pip figures it's already there. Only 
the main Python file gets resubmitted. This is a big issue and at the 
moment I can only solve it by killing the SDK container after each job 
execution. It would be much better if dependencies got installed only 
into a temporary venv that is discarded upon job completion or failure.

Janek


On 09/12/2021 19:45, Janek Bevendorff wrote:
> Hi Kyle,
>
> Thank you for your response.
>
>
>> There are a few working Beam+Flink+k8s configurations that have been 
>> published, such as [1] [2] and [3]. If these meet your requirements, 
>> I would recommend starting from one of them before reinventing your 
>> own. Otherwise, you can look to them for clues, since they’ve likely 
>> had to solve many of the same problems.
>
> Of course I designed my deployment after those resources and for 
> the most part working (I’m not using the operator at the moment, 
> because the Helm chart is broken and the whole project seems barely 
> maintained). The problems lie with what is neither documented nor 
> solved by any of those example deployments and also with bugs in 
> either Beam or Flink or the interaction thereof.
>
>> I’m not sure what progress you have made at this point; did you get 
>> everything to work, aside from the options issue? Did setting 
>> --artifact_endpoint resolve the “client cancelled” issue?
>
> No, that problem is unsolved. It occurs randomly, but fortunately not 
> very often (and usually within the first few minutes after 
> submission). Unfortunately, there is another more common problem that 
> keeps creeping up that looks like some totally random Flink failure 
> with the error “Partition not found” followed by a long partition ID 
> hash. I think it occurs after some other failure where Flink is unable 
> to reschedule a task properly (not sure what, though). It may be to do 
> with the fact that the Beam Python SDK runner (a sidecar container 
> inside the task manager pod) is persistent for the lifetime of the 
> task manager itself. At least I had issues with that earlier when I 
> submitted a new version of my job before Flink terminated the old task 
> manager (occurs about a minute after a job has finished). The result 
> was that, among other things, the submitted Python wheel wasn’t 
> reinstalled, because the SDK container still had the old version 
> (definitely a Beam bug, pip should be called with —reinstall).
>
> I don’t know how to trigger the problem exactly, nor how to solve it. 
> But it usually crashes my job after a few hours of processing time.
>
>> If I understand correctly, the  “Discarding invalid overrides” 
>> warning is a red herring; the option should still be passed on. So I 
>> think there may be an issue elsewhere. If you could share as much of 
>> your Flink/Beam configuration as possible, it may help to debug the 
>> problem.
>
> No, it’s not passed on. I tried. This is only an issue when I submit 
> uber JARs from the client machine. It works with the FlinkRunner 
> without uber JARs as well as PortableRunner+Job Server.
>
>> I recommend starting a separate thread regarding the incorrect Python 
>> documentation, since I fear it might get buried in this thread. The 
>> more specific incorrect examples you can point out, the better. I’d 
>> also be happy to review PRs if you’re willing to update the 
>> documentation yourself (it is all open source [4]).
>
> That’d be quite a few places. Also one very annoying thing is that 
> most of the time, the imports are missing, so I have to grep the 
> Python sources to find the correct imports or guess them from the Java 
> API. I’ll see if I find the time. That would also depend on whether I 
> can solve these issues or whether I have to scrap Beam and use Flink 
> directly (or have revert back to Spark, yuck!).
>
>> Regarding stateful processing, please provide code snippets so we can 
>> reproduce the issue(s). Again, it may be better to start a separate 
>> thread since stateful processing should be mostly orthogonal to the 
>> Flink deployment architecture.
>
> I stopped using it, because I couldn’t get it to run. Perhaps I am 
> missing some sort of configuration for persisting the state, but I am 
> neither getting an error nor can I find any documentation about this 
> part. The error message thrown by the DirectRunner is also totally 
> non-descriptive. Some “Not supported” error would have saved some time 
> here. The FlinkRunner doesn’t throw errors, but shows the behaviour I 
> described (timers triggered after each process() and no state 
> persistence).
>
> Janek
>

Re: Deployment difficulties with Python apps on Kubernetes Flink cluster

Posted by Janek Bevendorff <ja...@uni-weimar.de>.
Hi Kyle,

Thank you for your response.


> There are a few working Beam+Flink+k8s configurations that have been published, such as [1] [2] and [3]. If these meet your requirements, I would recommend starting from one of them before reinventing your own. Otherwise, you can look to them for clues, since they’ve likely had to solve many of the same problems.

Of course I designed my deployment after those resources and for the most part working (I’m not using the operator at the moment, because the Helm chart is broken and the whole project seems barely maintained). The problems lie with what is neither documented nor solved by any of those example deployments and also with bugs in either Beam or Flink or the interaction thereof.

> I’m not sure what progress you have made at this point; did you get everything to work, aside from the options issue? Did setting --artifact_endpoint resolve the “client cancelled” issue?

No, that problem is unsolved. It occurs randomly, but fortunately not very often (and usually within the first few minutes after submission). Unfortunately, there is another more common problem that keeps creeping up that looks like some totally random Flink failure with the error “Partition not found” followed by a long partition ID hash. I think it occurs after some other failure where Flink is unable to reschedule a task properly (not sure what, though). It may be to do with the fact that the Beam Python SDK runner (a sidecar container inside the task manager pod) is persistent for the lifetime of the task manager itself. At least I had issues with that earlier when I submitted a new version of my job before Flink terminated the old task manager (occurs about a minute after a job has finished). The result was that, among other things, the submitted Python wheel wasn’t reinstalled, because the SDK container still had the old version (definitely a Beam bug, pip should be called with —reinstall).

I don’t know how to trigger the problem exactly, nor how to solve it. But it usually crashes my job after a few hours of processing time.

> If I understand correctly, the  “Discarding invalid overrides” warning is a red herring; the option should still be passed on. So I think there may be an issue elsewhere. If you could share as much of your Flink/Beam configuration as possible, it may help to debug the problem.

No, it’s not passed on. I tried. This is only an issue when I submit uber JARs from the client machine. It works with the FlinkRunner without uber JARs as well as PortableRunner+Job Server.

> I recommend starting a separate thread regarding the incorrect Python documentation, since I fear it might get buried in this thread. The more specific incorrect examples you can point out, the better. I’d also be happy to review PRs if you’re willing to update the documentation yourself (it is all open source [4]).

That’d be quite a few places. Also one very annoying thing is that most of the time, the imports are missing, so I have to grep the Python sources to find the correct imports or guess them from the Java API. I’ll see if I find the time. That would also depend on whether I can solve these issues or whether I have to scrap Beam and use Flink directly (or have revert back to Spark, yuck!).

> Regarding stateful processing, please provide code snippets so we can reproduce the issue(s). Again, it may be better to start a separate thread since stateful processing should be mostly orthogonal to the Flink deployment architecture.

I stopped using it, because I couldn’t get it to run. Perhaps I am missing some sort of configuration for persisting the state, but I am neither getting an error nor can I find any documentation about this part. The error message thrown by the DirectRunner is also totally non-descriptive. Some “Not supported” error would have saved some time here. The FlinkRunner doesn’t throw errors, but shows the behaviour I described (timers triggered after each process() and no state persistence).

Janek


Re: Deployment difficulties with Python apps on Kubernetes Flink cluster

Posted by Kyle Weaver <kc...@google.com>.
Hi Janek,

There are a few working Beam+Flink+k8s configurations that have been
published, such as [1] [2] and [3]. If these meet your requirements, I
would recommend starting from one of them before reinventing your own.
Otherwise, you can look to them for clues, since they’ve likely had to
solve many of the same problems.

I’m not sure what progress you have made at this point; did you get
everything to work, aside from the options issue? Did setting
--artifact_endpoint resolve the “client cancelled” issue?

If I understand correctly, the  “Discarding invalid overrides” warning is a
red herring; the option should still be passed on. So I think there may be
an issue elsewhere. If you could share as much of your Flink/Beam
configuration as possible, it may help to debug the problem.

I recommend starting a separate thread regarding the incorrect Python
documentation, since I fear it might get buried in this thread. The more
specific incorrect examples you can point out, the better. I’d also be
happy to review PRs if you’re willing to update the documentation yourself
(it is all open source [4]).

Regarding stateful processing, please provide code snippets so we can
reproduce the issue(s). Again, it may be better to start a separate thread
since stateful processing should be mostly orthogonal to the Flink
deployment architecture.

[1]
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/docs/beam_guide.md
[2] https://github.com/sambvfx/beam-flink-k8s
[3]
https://python.plainenglish.io/apache-beam-flink-cluster-kubernetes-python-a1965f37b7cb
[4] https://github.com/apache/beam/tree/master/website/www/site/content/en

On Thu, Dec 2, 2021 at 5:23 AM Janek Bevendorff <
janek.bevendorff@uni-weimar.de> wrote:

> Hi,
>
> I know that there are various options for configuring Flink using Beam
> with the Java SDK, but are there any options to do the same with the
> Python SDK? The FlinkRunnerOptions class offers only a fraction of what
> the Java FlinkPipelineOptions class provides. I would like to be able to
> set the parallelism when I submit an Uber JAR and I would also like to
> be able to set the task retry count.
>
>
> On 30/11/2021 17:49, Janek Bevendorff wrote:
> > Again, one step closer to getting this thing running:
> >
> > I have to set --artifact_endpoint as well to submit a job to a remote
> > job server, not just --job_endpoint. Would be great if the docs at
> > least mentioned that.
> >
> > I also don't understand really why there is no single option for
> > setting both the job and artifact endpoint address (without the port
> > number). They must both run in the same container, otherwise I get
> > errors about invalid job staging IDs, so having two options is kind of
> > redundant configuration.
> >
> > Janek
> >
> >
> > On 30/11/2021 16:08, Janek Bevendorff wrote:
> >> Any ideas here? I am a few steps further now, but not quite there yet.
> >>
> >> The main deployment issue can be solved by sharing only
> >> /tmp/beam-artifact-staging among job and taskmanagers, not the whole
> >> tmp directory (this is knowledge from countless hours of googling and
> >> trying things out, no documentation here whatsover). It does not
> >> solve the remote deployment issue, but so far I am at least able to
> >> submit jobs with a locally running Beam job server.
> >>
> >> Unfortunately, I am getting random gRPC terminations with this
> >> exception:
> >>
> >>   File "venv/lib/python3.7/site-packages/apache_beam/pipeline.py",
> >> line 597, in __exit__
> >>     self.result.wait_until_finish()
> >>   File
> >>
> "venv/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>
> >> line 600, in wait_until_finish
> >>     raise self._runtime_exception
> >> RuntimeError: Pipeline
> >> BeamApp-root-1130144051-d0476877_4d32d3e8-1fbb-4f2d-88d9-c2e05fd624eb
> >> failed in state FAILED:
> >> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
> >> CANCELLED: client cancelled
> >>
> >> The error occurs randomly without any indication why. Do you have any
> >> idea what may be wrong with the gRPC connection or also what I may be
> >> missing for the remote job server deployment?
> >>
> >>
> >> Besides these questions, I also have a little rant (sorry for that,
> >> but I have to get this off my chest):
> >>
> >> I am getting a extremely frustrated with the Python documentation,
> >> which is often incomplete, sometimes outdated and occasionally plain
> >> wrong. I can tell that many examples were never actually run, because
> >> they contain invalid Python code, function names differ from what the
> >> API actually offers, parentheses are in the wrong places etc. One
> >> particular example is the splittable DoFn documentation. The original
> >> blog post is entirely outdated and also contains invalid Python code
> >> (missing self parameters of methods and such), but also the online
> >> manual is wrong (missing constructor parameters or required method
> >> overrides here, wrong parentheses there...). To understand how
> >> everything works, I am basically reverse engineering the code, taking
> >> into account the little API documentation there is. This is beyond
> >> annoying.
> >>
> >> I also noticed that stateful processing is completely broken
> >> apparently. With the local runner, it doesn't run at all (various
> >> exceptions thrown) and with the FlinkRunner or PortableRunner,
> >> BagStateSpecs or other state parameters are always empty and
> >> watermark timers fire after each invocation of process(). I tried
> >> countless potential solutions, but nothing works, so I gave up and
> >> resorted to using a CombineFn-based PTransform instead.
> >>
> >> Janek
> >>
> >>
> >> On 26/11/2021 17:01, Janek Bevendorff wrote:
> >>> I am one step further, but also not really.
> >>>
> >>> When I mount the shared drive that serves /tmp on the Flink job and
> >>> task managers also on my local machine and then spin up a local Beam
> >>> job server with this volume mounted on /tmp as well, I can get my
> >>> job to start. This is ugly as hell, because it requires so many
> >>> extra steps, but at least it's progress.
> >>>
> >>> Unfortunately, the job doesn't run properly and fails with
> >>>
> >>>   File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line
> >>> 462, in find_class
> >>>     return StockUnpickler.find_class(self, module, name)
> >>> ModuleNotFoundError: No module named 'XXXX'
> >>>
> >>> where XXX is the my application module that I deploy with
> >>> --setup_file. When I download the workflow.tar.gz from the staging
> >>> directory, I can confirm that the module is present.
> >>>
> >>> This isn't working as intended at all. Also what happens if multiple
> >>> users submit applications at the same time? All the Beam stuff in
> >>> /tmp has random names, but the stages/workflow.tar.gz file that is
> >>> provided for the Python SDK sidecar container has the same name for
> >>> each job. Hence it would be impossible to serve multiple users with
> >>> this setup.
> >>>
> >>> Janek
> >>>
> >>>
> >>> On 26/11/2021 15:32, Janek Bevendorff wrote:
> >>>> Hi,
> >>>>
> >>>> Currently, I am struggling with getting Beam to run on a
> >>>> Kubernetes-hosted Flink cluster and there is very little to no
> >>>> documentation on how to resolve my deployment issues (besides a few
> >>>> Stackoverflow threads without solutions).
> >>>>
> >>>> I have a Flink job server running on Kubernetes that creates new
> >>>> taskmanager pods from a pod template when I submit a job. Each
> >>>> taskmanager pod has a sidecar container running the Beam Python SDK
> >>>> image.
> >>>>
> >>>> With this setup in place, I tried multiple methods to submit a
> >>>> Python Beam job and all of them fail for different reasons:
> >>>>
> >>>> 1) Run my Python job via the FlinkRunner and set
> >>>> --environment_type=EXTERNAL
> >>>>
> >>>> This works perfectly fine locally, but fails when I set
> >>>> --flink_master to the Kubernetes load balancer IP to submit to the
> >>>> remote Kubernetes cluster. It  allows me to submit the job itself
> >>>> successfully, but not the necessary staging data. The Flink
> >>>> container shows
> >>>>
> >>>> java.io.FileNotFoundException:
> >>>>
> /tmp/beam-temp7hxxe2gs/artifacts2liu9b8y/779b17e6efab2bbfcba170762d1096fe2451e0e76c4361af9a68296f23f4a4ec/1-ref_Environment_default_e-workflow.tar.gz
>
> >>>> (No such file or directory)
> >>>>
> >>>> and the Python worker shows
> >>>>
> >>>> 2021/11/26 14:16:24 Failed to retrieve staged files: failed to
> >>>> retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for
> >>>> /tmp/staged/workflow.tar.gz
> >>>>         caused by:
> >>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for
> >>>> /tmp/staged/workflow.tar.gz
> >>>>         caused by:
> >>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for
> >>>> /tmp/staged/workflow.tar.gz
> >>>>         caused by:
> >>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for
> >>>> /tmp/staged/workflow.tar.gz
> >>>>         caused by:
> >>>> rpc error: code = Unknown desc =
> >>>>
> >>>> I found a Stackoverflow issue with the exact same issue, but
> >>>> without a solution. The file seems to exist only under /tmp on my
> >>>> local client machine, which is useless.
> >>>>
> >>>> 2) Submit the job with --flink_submit_uber_jar=True
> >>>>
> >>>> This will submit the staging information correctly, but I cannot
> >>>> set the amount of parallelism. Instead I get the following warning:
> >>>>
> >>>> WARNING:apache_beam.options.pipeline_options:Discarding invalid
> >>>> overrides: {'parallelism': 100}
> >>>>
> >>>> and the job runs with only a single worker (useless as well).
> >>>>
> >>>> 3) Spawn another job manager sidecar container running the Beam job
> >>>> server and submit via the PortableRunner
> >>>>
> >>>> This works (somewhat) when I run the job server image locally with
> >>>> --network=host, but I cannot get it to work on Kubernetes. I
> >>>> exposed the ports 8097-8099 on the load balancer IP, but when I
> >>>> submit a job, I only get
> >>>>
> >>>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous
> >>>> of RPC that terminated with:
> >>>>         status = StatusCode.UNAVAILABLE
> >>>>         details = "failed to connect to all addresses"
> >>>>         debug_error_string =
> >>>> "{"created":"@1637934464.499518882","description":"Failed to pick
> >>>> subchannel","file":"src/core
> >>>>
> /ext/filters/client_channel/client_channel.cc","file_line":3158,"referenced_errors":[{"created":"@1637934464.499518362","de
>
> >>>>
> >>>> scription":"failed to connect to all
> >>>>
> addresses","file":"src/core/lib/transport/error_utils.cc","file_line":147,"grpc_status
> >>>> ":14}]}"
> >>>>
> >>>> This method also seems to suffer from the same issue as 2) that I
> >>>> am unable to control the amount of parallelism.
> >>>>
> >>>>
> >>>> Is there anything that I am doing fundamentally wrong? I cannot
> >>>> really imagine that it is this difficult to submit a simple Python
> >>>> job to a Beam/Flink cluster.
> >>>>
> >>>>
> >>>> Thanks for any help
> >>>> Janek
> >>>>
> --
>
> Bauhaus-Universität Weimar
> Bauhausstr. 9a, R308
> 99423 Weimar, Germany
>
> Phone: +49 3643 58 3577 <+49%203643%20583577>
> www.webis.de
>
>