You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jeremy Lewi <je...@primer.ai> on 2022/02/11 13:23:47 UTC

Why is portable python runner trying to stage a flink job server jar?

Hi Folks,

I'm using a patched version of apache Beam 2.35 python and running on Flink
on Kubernetes using the PortableJobRunner.

It looks like when submitting the job, the runner tries to upload a large
274 Mb flink job server jar to the staging service.

This doesn't seem right. I already have an instance of the JobServer
running and my program is talking to the job server so why is it trying to
stage the JobServer?

I believe this problem started when I upgraded to 2.35.

When I debugged this I found the runner was getting stuck in offer_artifacts
https://github.com/apache/beam/blob/38b21a71a76aad902bd903d525b25a5ff464df55/sdks/python/apache_beam/runners/portability/artifact_service.py#L235

When I looked at the rolePayload of the request in question it was
rolePayload=beam-runners-flink-job-server-quH8FRP1-liJ7K9es-qBV9wguz4oBRlVegwqlW3OpqU.jar

How does the runner decide which artifacts to upload? Could this be caused
by using a patched version of the Python SDK but an unpatched version of
the job server jar; so the python version (2.35.0.dev2) doesn't match the
JobServer version 2.35.0? As a result, the portable runner thinks it needs
to upload the Jar?

We build our own version of the Python SDK because we need a fix for
https://issues.apache.org/jira/browse/BEAM-12244.

When we were using 2.33 we were also building our own Flink Jars in order
to pull in Kafka patches which hadn't been released yet.

Thanks
J

Re: Why is portable python runner trying to stage a flink job server jar?

Posted by Jeremy Lewi <je...@primer.ai>.
Thanks!

On Fri, Feb 11, 2022 at 5:22 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Hi Jeremy,
>
> By default we stage all jars in the CLASSPATH of expansion service. You
> can override this by setting the filesToStage option when starting up the
> expansion service:
> https://github.com/apache/beam/blob/7fa5387ffac4f2801077f2e55aa2eba7a47036d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/FileStagingOptions.java#L38
>
> Thanks,
> Cham
>
> On Fri, Feb 11, 2022 at 2:09 PM Jeremy Lewi <je...@primer.ai> wrote:
>
>> Hi Folks,
>>
>> So I think this is what's happening. My pipeline has multi-language
>> transforms because its using Kafka IO. When the runner calls transform on
>> those transforms it contacts the expansion service which responds back with
>> a list of jars. The runner then downloads those jars.
>>
>> Then when the runner stages artifacts it ends up uploading those jars to
>> the staging service.
>>
>> This seems unnecessary and inefficient. I was able to hack the
>> portable_runner.py code to test this out. When I did my job submitted quite
>> quickly (the pipeline's running but I haven't verified its working so its
>> possible I broke something).
>>
>> Is this working as intended? Is there someway to avoid this without
>> having to hack the runner code?
>>
>> Interestingly, it seems like downloading the jars is much faster than
>> uploading them but I haven't investigated this.
>>
>> J
>>
>> On Fri, Feb 11, 2022 at 8:23 AM Jeremy Lewi <je...@primer.ai>
>> wrote:
>>
>>> Hi Folks,
>>>
>>> I'm using a patched version of apache Beam 2.35 python and running on
>>> Flink on Kubernetes using the PortableJobRunner.
>>>
>>> It looks like when submitting the job, the runner tries to upload a
>>> large 274 Mb flink job server jar to the staging service.
>>>
>>> This doesn't seem right. I already have an instance of the JobServer
>>> running and my program is talking to the job server so why is it trying to
>>> stage the JobServer?
>>>
>>> I believe this problem started when I upgraded to 2.35.
>>>
>>> When I debugged this I found the runner was getting stuck in
>>> offer_artifacts
>>>
>>> https://github.com/apache/beam/blob/38b21a71a76aad902bd903d525b25a5ff464df55/sdks/python/apache_beam/runners/portability/artifact_service.py#L235
>>>
>>> When I looked at the rolePayload of the request in question it was
>>>
>>> rolePayload=beam-runners-flink-job-server-quH8FRP1-liJ7K9es-qBV9wguz4oBRlVegwqlW3OpqU.jar
>>>
>>> How does the runner decide which artifacts to upload? Could this be
>>> caused by using a patched version of the Python SDK but an unpatched
>>> version of the job server jar; so the python version (2.35.0.dev2) doesn't
>>> match the JobServer version 2.35.0? As a result, the portable runner thinks
>>> it needs to upload the Jar?
>>>
>>> We build our own version of the Python SDK because we need a fix for
>>> https://issues.apache.org/jira/browse/BEAM-12244.
>>>
>>> When we were using 2.33 we were also building our own Flink Jars in
>>> order to pull in Kafka patches which hadn't been released yet.
>>>
>>> Thanks
>>> J
>>>
>>>
>>>
>>>

Re: Why is portable python runner trying to stage a flink job server jar?

Posted by Chamikara Jayalath <ch...@google.com>.
Hi Jeremy,

By default we stage all jars in the CLASSPATH of expansion service. You can
override this by setting the filesToStage option when starting up the
expansion service:
https://github.com/apache/beam/blob/7fa5387ffac4f2801077f2e55aa2eba7a47036d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/FileStagingOptions.java#L38

Thanks,
Cham

On Fri, Feb 11, 2022 at 2:09 PM Jeremy Lewi <je...@primer.ai> wrote:

> Hi Folks,
>
> So I think this is what's happening. My pipeline has multi-language
> transforms because its using Kafka IO. When the runner calls transform on
> those transforms it contacts the expansion service which responds back with
> a list of jars. The runner then downloads those jars.
>
> Then when the runner stages artifacts it ends up uploading those jars to
> the staging service.
>
> This seems unnecessary and inefficient. I was able to hack the
> portable_runner.py code to test this out. When I did my job submitted quite
> quickly (the pipeline's running but I haven't verified its working so its
> possible I broke something).
>
> Is this working as intended? Is there someway to avoid this without having
> to hack the runner code?
>
> Interestingly, it seems like downloading the jars is much faster than
> uploading them but I haven't investigated this.
>
> J
>
> On Fri, Feb 11, 2022 at 8:23 AM Jeremy Lewi <je...@primer.ai> wrote:
>
>> Hi Folks,
>>
>> I'm using a patched version of apache Beam 2.35 python and running on
>> Flink on Kubernetes using the PortableJobRunner.
>>
>> It looks like when submitting the job, the runner tries to upload a large
>> 274 Mb flink job server jar to the staging service.
>>
>> This doesn't seem right. I already have an instance of the JobServer
>> running and my program is talking to the job server so why is it trying to
>> stage the JobServer?
>>
>> I believe this problem started when I upgraded to 2.35.
>>
>> When I debugged this I found the runner was getting stuck in
>> offer_artifacts
>>
>> https://github.com/apache/beam/blob/38b21a71a76aad902bd903d525b25a5ff464df55/sdks/python/apache_beam/runners/portability/artifact_service.py#L235
>>
>> When I looked at the rolePayload of the request in question it was
>>
>> rolePayload=beam-runners-flink-job-server-quH8FRP1-liJ7K9es-qBV9wguz4oBRlVegwqlW3OpqU.jar
>>
>> How does the runner decide which artifacts to upload? Could this be
>> caused by using a patched version of the Python SDK but an unpatched
>> version of the job server jar; so the python version (2.35.0.dev2) doesn't
>> match the JobServer version 2.35.0? As a result, the portable runner thinks
>> it needs to upload the Jar?
>>
>> We build our own version of the Python SDK because we need a fix for
>> https://issues.apache.org/jira/browse/BEAM-12244.
>>
>> When we were using 2.33 we were also building our own Flink Jars in order
>> to pull in Kafka patches which hadn't been released yet.
>>
>> Thanks
>> J
>>
>>
>>
>>

Re: Why is portable python runner trying to stage a flink job server jar?

Posted by Jeremy Lewi <je...@primer.ai>.
Hi Folks,

So I think this is what's happening. My pipeline has multi-language
transforms because its using Kafka IO. When the runner calls transform on
those transforms it contacts the expansion service which responds back with
a list of jars. The runner then downloads those jars.

Then when the runner stages artifacts it ends up uploading those jars to
the staging service.

This seems unnecessary and inefficient. I was able to hack the
portable_runner.py code to test this out. When I did my job submitted quite
quickly (the pipeline's running but I haven't verified its working so its
possible I broke something).

Is this working as intended? Is there someway to avoid this without having
to hack the runner code?

Interestingly, it seems like downloading the jars is much faster than
uploading them but I haven't investigated this.

J

On Fri, Feb 11, 2022 at 8:23 AM Jeremy Lewi <je...@primer.ai> wrote:

> Hi Folks,
>
> I'm using a patched version of apache Beam 2.35 python and running on
> Flink on Kubernetes using the PortableJobRunner.
>
> It looks like when submitting the job, the runner tries to upload a large
> 274 Mb flink job server jar to the staging service.
>
> This doesn't seem right. I already have an instance of the JobServer
> running and my program is talking to the job server so why is it trying to
> stage the JobServer?
>
> I believe this problem started when I upgraded to 2.35.
>
> When I debugged this I found the runner was getting stuck in
> offer_artifacts
>
> https://github.com/apache/beam/blob/38b21a71a76aad902bd903d525b25a5ff464df55/sdks/python/apache_beam/runners/portability/artifact_service.py#L235
>
> When I looked at the rolePayload of the request in question it was
>
> rolePayload=beam-runners-flink-job-server-quH8FRP1-liJ7K9es-qBV9wguz4oBRlVegwqlW3OpqU.jar
>
> How does the runner decide which artifacts to upload? Could this be caused
> by using a patched version of the Python SDK but an unpatched version of
> the job server jar; so the python version (2.35.0.dev2) doesn't match the
> JobServer version 2.35.0? As a result, the portable runner thinks it needs
> to upload the Jar?
>
> We build our own version of the Python SDK because we need a fix for
> https://issues.apache.org/jira/browse/BEAM-12244.
>
> When we were using 2.33 we were also building our own Flink Jars in order
> to pull in Kafka patches which hadn't been released yet.
>
> Thanks
> J
>
>
>
>