You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Wouter Zorgdrager <zo...@gmail.com> on 2021/08/14 08:47:05 UTC

Fwd: PyFlink performance and deployment issues

Hi all,

I'm still dealing with the PyFlink deployment issue as described below. I
see that I accidentally didn't forward it to the mailing list. Anyways, my
job is stuck in `Initializing` and the logs don't really give me a clue
what is going on.
In my IDE it runs fine. The command I use to submit to the cluster:

export
PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python

./flink run \
  --target remote \
  -m localhost:8081 \
  -pyarch venv.zip \
  --pyExecutable venv.zip/venv/bin/python \
  --parallelism 1 \
  --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \
  --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar

I hope someone can help me with this because it is a blocker for me.

Thanks in advance,
Wouter
---------- Forwarded message ---------
From: Wouter Zorgdrager <zo...@gmail.com>
Date: Thu, 8 Jul 2021 at 12:20
Subject: Re: PyFlink performance and deployment issues
To: Xingbo Huang <hx...@gmail.com>


HI Xingbo, all,

Regarding point 2, I actually made a mistake there. I picked port 8081
(WebUI port) rather than the job submission port (--target remote -m
localhost:8081). For some reason, this does not give an error or warning
and just starts a local cluster. However, now I got another issue: my job
is stuck at initialization. Here an excerpt from the JM log:

2021-07-08 12:12:18,094 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0)
with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Process-Stateful-User (1/1) (attempt #0) with
attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client ->
Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt
#0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77
@ wouter (dataPort=40987) with allocation id
d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,484 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,488 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - _stream_key_by_map_operator (1/1)
(ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,489 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,490 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Source: Custom Source -> Route-Incoming-Events -> (
Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful-
User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from
DEPLOYING to INITIALIZING.

I run with parallelism 1 and these are the latest loglines from the TM
(there is no obvious error):
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle size is configured to 5.
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle time is configured to 1
milliseconds.
2021-07-08 12:12:18,791 WARN
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Error while loading kafka-version.properties: inStream parameter is
null
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka version: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka commitId: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka startTimeMs: 1625739138789
2021-07-08 12:12:18,806 INFO org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to
produce into default topic client_reply
2021-07-08 12:12:18,815 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle size is configured to 5.
2021-07-08 12:12:18,816 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle time is configured to 1
milliseconds.
2021-07-08 12:12:19,119 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [
Producer clientId=producer-1] Cluster ID: NRL80FEjRjWzJKcqedz80A
2021-07-08 12:12:20,138 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
jobmanager.rpc.address, localhost
2021-07-08 12:12:20,138 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
jobmanager.rpc.port, 6123
2021-07-08 12:12:20,139 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
jobmanager.memory.process.size, 1600m
2021-07-08 12:12:20,139 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
taskmanager.memory.process.size, 1728m
2021-07-08 12:12:20,139 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
taskmanager.numberOfTaskSlots, 1
2021-07-08 12:12:20,141 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property: parallelism.default,
1
2021-07-08 12:12:20,142 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
jobmanager.execution.failover-strategy, region
2021-07-08 12:12:20,953 INFO org.apache.flink.python.env.beam.
ProcessPythonEnvironmentManager [] - PYTHONPATH of python worker: null
2021-07-08 12:12:20,953 INFO org.apache.flink.python.env.beam.
ProcessPythonEnvironmentManager [] - Python working dir of python worker:
/tmp/python-dist-98c4419a-2048-4d9c-a082-7f83ef35c35c/python-archives
2021-07-08 12:12:27,869 INFO org.apache.flink.python.env.beam.
ProcessPythonEnvironmentManager [] - Python interpreter path:
venv.zip/venv/bin/python

I trimmed some of the Kafka config output. Any thoughts on what I am doing
wrong? It seems to be stuck setting up the Python environment/connections.

Thanks!
Wouter

On Thu, 8 Jul 2021 at 07:17, Xingbo Huang <hx...@gmail.com> wrote:

> Hi Wouter,
> Sorry for the late reply. I will try to answer your questions in detail.
>
> 1. >>> Perforce problem.
> When running udf job locally, beam will use a loopback way to connect back
> to the python process used by the compilation job, so the time of starting
> up the job will come faster than pyflink which will create a new python
> process to execute udf code.
>
> 2. >>> However, this command created a local MiniCluster again rather than
> submitting it to my remote cluster.
> I tried to successfully submit a Python job to the standalone cluster to
> run through the following command
>
> .bin/start-cluster.sh
> ./bin/flink run --target remote \
> -m localhost:8086 \
> -pyarch /Users/duanchen/venv/venv.zip \
> -pyexec venv.zip/venv/bin/python \
> --parallelism 1 \
> --python
> /Users/duanchen/sourcecode/pyflink-performance-demo/python/flink/flink-perf-test.py
> \
> --jarfile
> /Users/duanchen/sourcecode/pyflink-performance-demo/java/target/flink-perf-tests-0.1.jar
>
> The situation you encountered is very strange
>
> 3. >>> In my second attempt, I tried deploying it to a Kubernetes cluster
> using the following command:
>
> When running in Application mode, you should be sure that all paths are
> accessible by the JobManager of your application. The path of
> ~/Documents/runtime.py is under your client side, right? You need to use
> the path under your k8s cluster. This part of the document does not explain
> these implicit things well.
>
> 4. >>> Lastly, I wondered if it is possible to set a key for events send
> to the KafkaProducer.
> You can see if Kafka Table Connector[1] can meet your needs.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#features
>
> Best,
> Xingbo
>
> Wouter Zorgdrager <zo...@gmail.com> 于2021年7月6日周二 下午4:58写道:
>
>> Dear community,
>>
>> I have been struggling a lot with the deployment of my PyFlink job.
>> Moreover, the performance seems to be very disappointing especially the
>> low-throughput latency. I have been playing around with configuration
>> values, but it has not been improving.
>> In short, I have a Datastream job with multiple Python operators
>> including a ProcessFunction. The job reads from Kafka and writes to Kafka
>> again. For single events, E2E latency has been somewhere between 600ms and
>> 2000ms. When I'm increasing throughput, latency becomes in the order of
>> seconds.
>> This is when I configure my job like this
>>         config.set_integer("python.fn-execution.bundle.time", 1)
>>         config.set_integer("python.fn-execution.bundle.size", 1)
>> I tried several configuration values, but the results are similar.
>> Interestingly, I have a similar Python streaming application written in
>> Apache Beam which does have low-latency, single events are processed <
>> 30ms.  If I recall correctly, they use the same technique with bundling and
>> sending to Python processes.
>> On the other hand, Beam uses an in-memory runner when running locally
>> which might change the situation. I'm not sure how that compares to a local
>> Flink MiniCluster.
>>
>> I hoped that performance might improve when I deploy this on a (remote)
>> Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink
>> job to a remote Flink cluster. In my first attempt, I created a local TM +
>> JM setup and tried to deploy it using the "./flink run" command.
>> However, this command created a local MiniCluster again rather than
>> submitting it to my remote cluster. The full command was:
>> ./flink run --target remote \
>> -m localhost:8081 \
>> -pyarch venv.zip \
>> -pyexec venv.zip/venv/bin/python \
>> --parallelism 4 \
>> --python ~/Documents/runtime.py \
>> --jarfile ~/Documents/combined.jar
>>
>> Note that venv.zip stores all the Python dependencies for my PyFlink job
>> whereas combined.jar stores the Java dependencies. I tried several
>> variants of this command, but it *never *submitted to my running
>> JobManager and always ran it locally.
>> In my second attempt, I tried deploying it to a Kubernetes cluster using
>> the following command:
>>
>> ./flink run-application \
>> --target kubernetes-application \
>> -Dkubernetes.cluster-id=flink-cluster \
>> -Dtaskmanager.memory.process.size=4096m \
>> -Dkubernetes.taskmanager.cpu=2 \
>> -Dkubernetes.service-account=flink-service-account \
>> -Dtaskmanager.numberOfTaskSlots=4 \
>> -Dkubernetes.container.image=pyflink:latest \
>> -pyarch venv.zip \
>> -pyexec venv.zip/venv/bin/python \
>> --parallelism 4 \
>> -py ~/Documents/runtime.py \
>> --jarfile ~/Documents/combined.jar
>>
>> I created the pyflink:latest image by following the documentation here
>> [1] It was unclear to me if had to include my project files in this Docker
>> image.
>> When running it like this, it did submit it to the remote K8s cluster but
>> I got an exception that it could not find my runtime.py file in some
>> sort of tmp folder.
>>
>> Lastly, I wondered if it is possible to set a key for events send to the
>> KafkaProducer. Right now, it seems you can only configure some (static)
>> properties and the serializer.
>> Is there are a workaround to be able to set the key and value of an event
>> in PyFlink?
>>
>> I hope you can help me out with my struggles! Thanks in advance.
>>
>> Regards,
>> Wouter
>>
>> [1] -
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python
>>
>>
>

Re: PyFlink performance and deployment issues

Posted by Dian Fu <di...@gmail.com>.
Hi Wouter,

I suspect that it’s transferring the file venv.zip and so it may take some time. Does it stuck there forever? Could you share some log file?

Regards,
Dian

> 2021年8月14日 下午4:47,Wouter Zorgdrager <zo...@gmail.com> 写道:
> 
> Hi all,
> 
> I'm still dealing with the PyFlink deployment issue as described below. I see that I accidentally didn't forward it to the mailing list. Anyways, my job is stuck in `Initializing` and the logs don't really give me a clue what is going on.
> In my IDE it runs fine. The command I use to submit to the cluster:
> 
> export PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python
> 
> ./flink run \
>   --target remote \
>   -m localhost:8081 \
>   -pyarch venv.zip \
>   --pyExecutable venv.zip/venv/bin/python \
>   --parallelism 1 \
>   --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \
>   --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar
> 
> I hope someone can help me with this because it is a blocker for me.
> 
> Thanks in advance,
> Wouter
> ---------- Forwarded message ---------
> From: Wouter Zorgdrager <zorgdragerw@gmail.com <ma...@gmail.com>>
> Date: Thu, 8 Jul 2021 at 12:20
> Subject: Re: PyFlink performance and deployment issues
> To: Xingbo Huang <hxbks2ks@gmail.com <ma...@gmail.com>>
> 
> 
> HI Xingbo, all,
> 
> Regarding point 2, I actually made a mistake there. I picked port 8081 (WebUI port) rather than the job submission port (--target remote -m localhost:8081). For some reason, this does not give an error or warning and just starts a local cluster. However, now I got another issue: my job is stuck at initialization. Here an excerpt from the JM log:
> 
> 2021-07-08 12:12:18,094 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0) with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
> 2021-07-08 12:12:18,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING.
> 2021-07-08 12:12:18,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Process-Stateful-User (1/1) (attempt #0) with attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
> 2021-07-08 12:12:18,098 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from SCHEDULED to DEPLOYING.
> 2021-07-08 12:12:18,098 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt #0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
> 2021-07-08 12:12:18,484 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING.
> 2021-07-08 12:12:18,488 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - _stream_key_by_map_operator (1/1) (ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING.
> 2021-07-08 12:12:18,489 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from DEPLOYING to INITIALIZING.
> 2021-07-08 12:12:18,490 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Route-Incoming-Events -> (Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful-User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from DEPLOYING to INITIALIZING.
> 
> I run with parallelism 1 and these are the latest loglines from the TM (there is no obvious error):
> 2021-07-08 12:12:18,729 INFO  org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle size is configured to 5.
> 2021-07-08 12:12:18,729 INFO  org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle time is configured to 1 milliseconds.
> 2021-07-08 12:12:18,791 WARN  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Error while loading kafka-version.properties: inStream parameter is null
> 2021-07-08 12:12:18,792 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: unknown
> 2021-07-08 12:12:18,792 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: unknown
> 2021-07-08 12:12:18,792 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1625739138789
> 2021-07-08 12:12:18,806 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic client_reply
> 2021-07-08 12:12:18,815 INFO  org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle size is configured to 5.
> 2021-07-08 12:12:18,816 INFO  org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle time is configured to 1 milliseconds.
> 2021-07-08 12:12:19,119 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-1] Cluster ID: NRL80FEjRjWzJKcqedz80A
> 2021-07-08 12:12:20,138 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
> 2021-07-08 12:12:20,138 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
> 2021-07-08 12:12:20,139 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
> 2021-07-08 12:12:20,139 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
> 2021-07-08 12:12:20,139 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
> 2021-07-08 12:12:20,141 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
> 2021-07-08 12:12:20,142 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
> 2021-07-08 12:12:20,953 INFO  org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - PYTHONPATH of python worker: null
> 2021-07-08 12:12:20,953 INFO  org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python working dir of python worker: /tmp/python-dist-98c4419a-2048-4d9c-a082-7f83ef35c35c/python-archives
> 2021-07-08 12:12:27,869 INFO  org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python interpreter path: venv.zip/venv/bin/python
> 
> I trimmed some of the Kafka config output. Any thoughts on what I am doing wrong? It seems to be stuck setting up the Python environment/connections.
> 
> Thanks!
> Wouter 
> 
> On Thu, 8 Jul 2021 at 07:17, Xingbo Huang <hxbks2ks@gmail.com <ma...@gmail.com>> wrote:
> Hi Wouter,
> Sorry for the late reply. I will try to answer your questions in detail.
> 
> 1. >>> Perforce problem.
> When running udf job locally, beam will use a loopback way to connect back to the python process used by the compilation job, so the time of starting up the job will come faster than pyflink which will create a new python process to execute udf code.
> 
> 2. >>> However, this command created a local MiniCluster again rather than submitting it to my remote cluster.
> I tried to successfully submit a Python job to the standalone cluster to run through the following command
> 
> .bin/start-cluster.sh
> ./bin/flink run --target remote \
> -m localhost:8086 \
> -pyarch /Users/duanchen/venv/venv.zip \
> -pyexec venv.zip/venv/bin/python \
> --parallelism 1 \
> --python /Users/duanchen/sourcecode/pyflink-performance-demo/python/flink/flink-perf-test.py \
> --jarfile /Users/duanchen/sourcecode/pyflink-performance-demo/java/target/flink-perf-tests-0.1.jar
> 
> The situation you encountered is very strange
> 
> 3. >>> In my second attempt, I tried deploying it to a Kubernetes cluster using the following command:
> 
> When running in Application mode, you should be sure that all paths are accessible by the JobManager of your application. The path of ~/Documents/runtime.py is under your client side, right? You need to use the path under your k8s cluster. This part of the document does not explain these implicit things well.
> 
> 4. >>> Lastly, I wondered if it is possible to set a key for events send to the KafkaProducer. 
> You can see if Kafka Table Connector[1] can meet your needs.
> 
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#features <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#features>
> 
> Best,
> Xingbo
> 
> Wouter Zorgdrager <zorgdragerw@gmail.com <ma...@gmail.com>> 于2021年7月6日周二 下午4:58写道:
> Dear community,
> 
> I have been struggling a lot with the deployment of my PyFlink job. Moreover, the performance seems to be very disappointing especially the low-throughput latency. I have been playing around with configuration values, but it has not been improving.
> In short, I have a Datastream job with multiple Python operators including a ProcessFunction. The job reads from Kafka and writes to Kafka again. For single events, E2E latency has been somewhere between 600ms and 2000ms. When I'm increasing throughput, latency becomes in the order of seconds. 
> This is when I configure my job like this
>         config.set_integer("python.fn-execution.bundle.time", 1)
>         config.set_integer("python.fn-execution.bundle.size", 1)
> I tried several configuration values, but the results are similar. Interestingly, I have a similar Python streaming application written in Apache Beam which does have low-latency, single events are processed < 30ms.  If I recall correctly, they use the same technique with bundling and sending to Python processes.
> On the other hand, Beam uses an in-memory runner when running locally which might change the situation. I'm not sure how that compares to a local Flink MiniCluster.
> 
> I hoped that performance might improve when I deploy this on a (remote) Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink job to a remote Flink cluster. In my first attempt, I created a local TM + JM setup and tried to deploy it using the "./flink run" command. 
> However, this command created a local MiniCluster again rather than submitting it to my remote cluster. The full command was:
> ./flink run --target remote \
> -m localhost:8081 \
> -pyarch venv.zip \
> -pyexec venv.zip/venv/bin/python \
> --parallelism 4 \
> --python ~/Documents/runtime.py \
> --jarfile ~/Documents/combined.jar
> 
> Note that venv.zip stores all the Python dependencies for my PyFlink job whereas combined.jar stores the Java dependencies. I tried several variants of this command, but it never submitted to my running JobManager and always ran it locally.
> In my second attempt, I tried deploying it to a Kubernetes cluster using the following command:
> 
> ./flink run-application \
> --target kubernetes-application \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dtaskmanager.memory.process.size=4096m \
> -Dkubernetes.taskmanager.cpu=2 \
> -Dkubernetes.service-account=flink-service-account \
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dkubernetes.container.image=pyflink:latest \
> -pyarch venv.zip \
> -pyexec venv.zip/venv/bin/python \
> --parallelism 4 \
> -py ~/Documents/runtime.py \
> --jarfile ~/Documents/combined.jar
> 
> I created the pyflink:latest image by following the documentation here [1] It was unclear to me if had to include my project files in this Docker image.
> When running it like this, it did submit it to the remote K8s cluster but I got an exception that it could not find my runtime.py file in some sort of tmp folder.
> 
> Lastly, I wondered if it is possible to set a key for events send to the KafkaProducer. Right now, it seems you can only configure some (static) properties and the serializer.
> Is there are a workaround to be able to set the key and value of an event in PyFlink?
> 
> I hope you can help me out with my struggles! Thanks in advance.
> 
> Regards,
> Wouter
> 
> [1] - https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python>