You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kevin Lam <ke...@shopify.com> on 2021/03/05 21:28:55 UTC
Running Pyflink job on K8s Flink Cluster Deployment?
Hello everyone,
I'm looking to run a Pyflink application run in a distributed fashion,
using kubernetes, and am currently facing issues. I've successfully gotten
a Scala Flink Application to run using the manifests provided at [0]
I attempted to run the application by updating the jobmanager command args
from
args: ["standalone-job", "--job-classname", "com.job.ClassName",
<optional arguments>, <job arguments>]
to
args: ["standalone-job", "--python", "my_python_app.py", <optional
arguments>, <job arguments>]
But this didn't work. It resulted in the following error:
Caused by: java.lang.LinkageError: loader constraint violation: loader
org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
org.apache.commons.cli.Options. A different class with the same name was
previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
module of loader 'app'
I was able to get things to 'run' by setting args to:
args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]
But I'm not sure if things were running in a distributed fashion or not.
1/ Is there a good way to check if the task pods were being correctly
utilized?
2/ Are there any similar examples to [0] for how to run Pyflink jobs on
kubernetes?
Open to any suggestions you may have. Note: we'd prefer not to run using
the native K8S route outlined at [1] because we need to maintain the
ability to customize certain aspects of the deployment (eg. mounting SSDs
to some of the pods)
Thanks in advance!
[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
Re: Running Pyflink job on K8s Flink Cluster Deployment?
Posted by Kevin Lam <ke...@shopify.com>.
Awesome, thanks Shuiqiang! I was able to get an example running by
referencing your configs.
On Sat, Mar 6, 2021 at 7:12 AM Shuiqiang Chen <ac...@gmail.com> wrote:
> Hi Kevin,
>
> For your information, bellow is an example for running a PyFlink table API
> WordCount job.
>
> 1. Building a Docker image with Python and PyFlink Installed:
>
> Dockerfile:
>
> FROM flink:1.12.0
>
>
> # install python3 and pip3
> RUN apt-get update -y && \
> apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf
> /var/lib/apt/lists/*
> RUN ln -s /usr/bin/python3 /usr/bin/python
>
> # install Python Flink
>
> RUN pip3 install apache-flink==1.12.0
>
> 2. Resource definitions:
>
> Flink-configuration-configmap.yaml:
>
> apiVersion: v1
> kind: ConfigMap
> metadata:
> name: flink-config
> labels:
> app: flink
> data:
> flink-conf.yaml: |+
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 2
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> queryable-state.proxy.ports: 6125
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size: 1728m
> parallelism.default: 2
> log4j-console.properties: |+
> # This affects logging for both user code and Flink
> rootLogger.level = INFO
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
>
> # Uncomment this if you want to _only_ change Flink's logging
> #logger.flink.name = org.apache.flink
> #logger.flink.level = INFO
>
> # The following lines keep the log level of common
> libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> logger.akka.name = akka
> logger.akka.level = INFO
> logger.kafka.name= org.apache.kafka
> logger.kafka.level = INFO
> logger.hadoop.name = org.apache.hadoop
> logger.hadoop.level = INFO
> logger.zookeeper.name = org.apache.zookeeper
> logger.zookeeper.level = INFO
>
> # Log all infos to the console
> appender.console.name = ConsoleAppender
> appender.console.type = CONSOLE
> appender.console.layout.type = PatternLayout
> appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
>
> # Log all infos in the given rolling file
> appender.rolling.name = RollingFileAppender
> appender.rolling.type = RollingFile
> appender.rolling.append = false
> appender.rolling.fileName = ${sys:log.file}
> appender.rolling.filePattern = ${sys:log.file}.%i
> appender.rolling.layout.type = PatternLayout
> appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
> appender.rolling.policies.type = Policies
> appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
> appender.rolling.policies.size.size=100MB
> appender.rolling.strategy.type = DefaultRolloverStrategy
> appender.rolling.strategy.max = 10
>
> # Suppress the irrelevant (wrong) warnings from the Netty channel
> handler
> logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
> logger.netty.level = OFF
>
> Job-manager-service.yaml:
>
> apiVersion: v1
> kind: Service
> metadata:
> name: flink-jobmanager
> spec:
> type: ClusterIP
> ports:
> - name: rpc
> port: 6123
> - name: blob-server
> port: 6124
> - name: webui
> port: 8081
> selector:
> app: flink
> component: jobmanager
>
> Job-manager.yaml
>
> apiVersion: batch/v1
> kind: Job
> metadata:
> name: flink-jobmanager
> spec:
> template:
> metadata:
> labels:
> app: flink
> component: jobmanager
> spec:
> restartPolicy: OnFailure
> containers:
> - name: jobmanager
> image: pyflink:v1
> env:
> args: ["standalone-job", "-py",
> "/opt/flink/examples/python/table/batch/word_count.py"]
> ports:
> - containerPort: 6123
> name: rpc
> - containerPort: 6124
> name: blob-server
> - containerPort: 8081
> name: webui
> livenessProbe:
> tcpSocket:
> port: 6123
> initialDelaySeconds: 30
> periodSeconds: 60
> volumeMounts:
> - name: flink-config-volume
> mountPath: /opt/flink/conf
> securityContext:
> runAsUser: 9999 # refers to user _flink_ from official flink
> image, change if necessary
> volumes:
> - name: flink-config-volume
> configMap:
> name: flink-config
> items:
> - key: flink-conf.yaml
> path: flink-conf.yaml
> - key: log4j-console.properties
> path: log4j-console.properties
>
> Task-manager.yaml
>
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: flink-taskmanager
> spec:
> replicas: 2
> selector:
> matchLabels:
> app: flink
> component: taskmanager
> template:
> metadata:
> labels:
> app: flink
> component: taskmanager
> spec:
> containers:
> - name: taskmanager
> image: pyflink:v1
> env:
> args: ["taskmanager"]
> ports:
> - containerPort: 6122
> name: rpc
> - containerPort: 6125
> name: query-state
> livenessProbe:
> tcpSocket:
> port: 6122
> initialDelaySeconds: 30
> periodSeconds: 60
> volumeMounts:
> - name: flink-config-volume
> mountPath: /opt/flink/conf/
> securityContext:
> runAsUser: 9999 # refers to user _flink_ from official flink
> image, change if necessary
> volumes:
> - name: flink-config-volume
> configMap:
> name: flink-config
> items:
> - key: flink-conf.yaml
> path: flink-conf.yaml
> - key: log4j-console.properties
> path: log4j-console.properties
>
> 3. Creating resources:
>
> $ kubectl create -f flink-configuration-configmap.yaml$ kubectl create -f jobmanager-service.yaml# Create the deployments for the cluster$ kubectl create -f job-manager.yaml$ kubectl create -f task-manager.yaml
>
> Best,
> Shuiqiang
>
> Shuiqiang Chen <ac...@gmail.com> 于2021年3月6日周六 下午5:10写道:
>
>> Hi Kevin,
>>
>> You are able to run PyFlink applications on kuberetes cluster, both
>> native k8s mode and resource definition mode are supported since
>> release-1.12.0. Currently, Python and PyFlink are not enabled in official
>> flink docker image, that you might need to build a custom image with Python
>> and PyFlink install, please refer to Enbale Python in docker
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
>> .
>>
>> Generally, by setting the value of args field in
>> `jobmanager-application.yaml` to be args: ["standalone-job", "--python",
>> "my_python_app.py", <optional arguments>, <job arguments>] the job
>> manager will try to submit a PyFlink job with the specified python file
>> once it is started. You can check the pod status for jobmanger and
>> taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will
>> turn to the completed state once the job is finished or error state if
>> there is something wrong, while the task manger pod will always be in the
>> running state.
>>
>> Finally, it requires you to tear down the cluster by deleting all created
>> resources (jobmanger/taskmanger jobs, flink-conf configmap,
>> jobmanger-service, etc).
>>
>> Best,
>> Shuiqiang
>>
>>
>>
>> Kevin Lam <ke...@shopify.com> 于2021年3月6日周六 上午5:29写道:
>>
>>> Hello everyone,
>>>
>>> I'm looking to run a Pyflink application run in a distributed fashion,
>>> using kubernetes, and am currently facing issues. I've successfully gotten
>>> a Scala Flink Application to run using the manifests provided at [0]
>>>
>>> I attempted to run the application by updating the jobmanager command
>>> args from
>>>
>>> args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>]
>>>
>>> to
>>>
>>> args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>]
>>>
>>> But this didn't work. It resulted in the following error:
>>>
>>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>>> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
>>> org.apache.commons.cli.Options. A different class with the same name was
>>> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
>>> module of loader 'app'
>>>
>>> I was able to get things to 'run' by setting args to:
>>>
>>> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]
>>>
>>>
>>> But I'm not sure if things were running in a distributed fashion or not.
>>>
>>> 1/ Is there a good way to check if the task pods were being correctly
>>> utilized?
>>>
>>> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
>>> kubernetes?
>>>
>>> Open to any suggestions you may have. Note: we'd prefer not to run using
>>> the native K8S route outlined at [1] because we need to maintain the
>>> ability to customize certain aspects of the deployment (eg. mounting SSDs
>>> to some of the pods)
>>>
>>> Thanks in advance!
>>>
>>> [0]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>>>
>>>
Re: Running Pyflink job on K8s Flink Cluster Deployment?
Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Kevin,
For your information, bellow is an example for running a PyFlink table API
WordCount job.
1. Building a Docker image with Python and PyFlink Installed:
Dockerfile:
FROM flink:1.12.0
# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf
/var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python
# install Python Flink
RUN pip3 install apache-flink==1.12.0
2. Resource definitions:
Flink-configuration-configmap.yaml:
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors
on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel
handler
logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
Job-manager-service.yaml:
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
Job-manager.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: pyflink:v1
env:
args: ["standalone-job", "-py",
"/opt/flink/examples/python/table/batch/word_count.py"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink
image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
Task-manager.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: pyflink:v1
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink
image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
3. Creating resources:
$ kubectl create -f flink-configuration-configmap.yaml$ kubectl create
-f jobmanager-service.yaml# Create the deployments for the cluster$
kubectl create -f job-manager.yaml$ kubectl create -f
task-manager.yaml
Best,
Shuiqiang
Shuiqiang Chen <ac...@gmail.com> 于2021年3月6日周六 下午5:10写道:
> Hi Kevin,
>
> You are able to run PyFlink applications on kuberetes cluster, both native
> k8s mode and resource definition mode are supported since release-1.12.0.
> Currently, Python and PyFlink are not enabled in official flink docker
> image, that you might need to build a custom image with Python and PyFlink
> install, please refer to Enbale Python in docker
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
> .
>
> Generally, by setting the value of args field in
> `jobmanager-application.yaml` to be args: ["standalone-job", "--python",
> "my_python_app.py", <optional arguments>, <job arguments>] the job
> manager will try to submit a PyFlink job with the specified python file
> once it is started. You can check the pod status for jobmanger and
> taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will
> turn to the completed state once the job is finished or error state if
> there is something wrong, while the task manger pod will always be in the
> running state.
>
> Finally, it requires you to tear down the cluster by deleting all created
> resources (jobmanger/taskmanger jobs, flink-conf configmap,
> jobmanger-service, etc).
>
> Best,
> Shuiqiang
>
>
>
> Kevin Lam <ke...@shopify.com> 于2021年3月6日周六 上午5:29写道:
>
>> Hello everyone,
>>
>> I'm looking to run a Pyflink application run in a distributed fashion,
>> using kubernetes, and am currently facing issues. I've successfully gotten
>> a Scala Flink Application to run using the manifests provided at [0]
>>
>> I attempted to run the application by updating the jobmanager command
>> args from
>>
>> args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>]
>>
>> to
>>
>> args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>]
>>
>> But this didn't work. It resulted in the following error:
>>
>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
>> org.apache.commons.cli.Options. A different class with the same name was
>> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
>> module of loader 'app'
>>
>> I was able to get things to 'run' by setting args to:
>>
>> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]
>>
>>
>> But I'm not sure if things were running in a distributed fashion or not.
>>
>> 1/ Is there a good way to check if the task pods were being correctly
>> utilized?
>>
>> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
>> kubernetes?
>>
>> Open to any suggestions you may have. Note: we'd prefer not to run using
>> the native K8S route outlined at [1] because we need to maintain the
>> ability to customize certain aspects of the deployment (eg. mounting SSDs
>> to some of the pods)
>>
>> Thanks in advance!
>>
>> [0]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>>
>>
Re: Running Pyflink job on K8s Flink Cluster Deployment?
Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Kevin,
You are able to run PyFlink applications on kuberetes cluster, both native
k8s mode and resource definition mode are supported since release-1.12.0.
Currently, Python and PyFlink are not enabled in official flink docker
image, that you might need to build a custom image with Python and PyFlink
install, please refer to Enbale Python in docker
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
.
Generally, by setting the value of args field in
`jobmanager-application.yaml` to be args: ["standalone-job", "--python",
"my_python_app.py", <optional arguments>, <job arguments>] the job manager
will try to submit a PyFlink job with the specified python file once it is
started. You can check the pod status for jobmanger and taskmanger via
`kubectl get pods [-n namespace]`. The job manger pod will turn to the
completed state once the job is finished or error state if there is
something wrong, while the task manger pod will always be in the running
state.
Finally, it requires you to tear down the cluster by deleting all created
resources (jobmanger/taskmanger jobs, flink-conf configmap,
jobmanger-service, etc).
Best,
Shuiqiang
Kevin Lam <ke...@shopify.com> 于2021年3月6日周六 上午5:29写道:
> Hello everyone,
>
> I'm looking to run a Pyflink application run in a distributed fashion,
> using kubernetes, and am currently facing issues. I've successfully gotten
> a Scala Flink Application to run using the manifests provided at [0]
>
> I attempted to run the application by updating the jobmanager command args
> from
>
> args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>]
>
> to
>
> args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>]
>
> But this didn't work. It resulted in the following error:
>
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
> org.apache.commons.cli.Options. A different class with the same name was
> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
> module of loader 'app'
>
> I was able to get things to 'run' by setting args to:
>
> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]
>
>
> But I'm not sure if things were running in a distributed fashion or not.
>
> 1/ Is there a good way to check if the task pods were being correctly
> utilized?
>
> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
> kubernetes?
>
> Open to any suggestions you may have. Note: we'd prefer not to run using
> the native K8S route outlined at [1] because we need to maintain the
> ability to customize certain aspects of the deployment (eg. mounting SSDs
> to some of the pods)
>
> Thanks in advance!
>
> [0]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>
>