You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kevin Lam <ke...@shopify.com> on 2021/03/05 21:28:55 UTC

Running Pyflink job on K8s Flink Cluster Deployment?

Hello everyone,

I'm looking to run a Pyflink application run in a distributed fashion,
using kubernetes, and am currently facing issues. I've successfully gotten
a Scala Flink Application to run using the manifests provided at [0]

I attempted to run the application by updating the jobmanager command args
from

 args: ["standalone-job", "--job-classname", "com.job.ClassName",
<optional arguments>, <job arguments>]

to

args: ["standalone-job", "--python", "my_python_app.py", <optional
arguments>, <job arguments>]

But this didn't work. It resulted in the following error:

Caused by: java.lang.LinkageError: loader constraint violation: loader
org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
org.apache.commons.cli.Options. A different class with the same name was
previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
module of loader 'app'

I was able to get things to 'run' by setting args to:

args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]


But I'm not sure if things were running in a distributed fashion or not.

1/ Is there a good way to check if the task pods were being correctly
utilized?

2/ Are there any similar examples to [0] for how to run Pyflink jobs on
kubernetes?

Open to any suggestions you may have. Note: we'd prefer not to run using
the native K8S route outlined at [1] because we need to maintain the
ability to customize certain aspects of the deployment (eg. mounting SSDs
to some of the pods)

Thanks in advance!

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode

Re: Running Pyflink job on K8s Flink Cluster Deployment?

Posted by Kevin Lam <ke...@shopify.com>.
Awesome, thanks Shuiqiang! I was able to get an example running by
referencing your configs.

On Sat, Mar 6, 2021 at 7:12 AM Shuiqiang Chen <ac...@gmail.com> wrote:

> Hi Kevin,
>
> For your information, bellow is an example for running a PyFlink table API
> WordCount job.
>
> 1. Building a Docker image with Python and PyFlink Installed:
>
> Dockerfile:
>
> FROM flink:1.12.0
>
>
> # install python3 and pip3
> RUN apt-get update -y && \
> apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf
> /var/lib/apt/lists/*
> RUN ln -s /usr/bin/python3 /usr/bin/python
>
> # install Python Flink
>
> RUN pip3 install apache-flink==1.12.0
>
> 2. Resource definitions:
>
> Flink-configuration-configmap.yaml:
>
> apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: flink-config
>   labels:
>     app: flink
> data:
>   flink-conf.yaml: |+
>     jobmanager.rpc.address: flink-jobmanager
>     taskmanager.numberOfTaskSlots: 2
>     blob.server.port: 6124
>     jobmanager.rpc.port: 6123
>     taskmanager.rpc.port: 6122
>     queryable-state.proxy.ports: 6125
>     jobmanager.memory.process.size: 1600m
>     taskmanager.memory.process.size: 1728m
>     parallelism.default: 2
>   log4j-console.properties: |+
>     # This affects logging for both user code and Flink
>     rootLogger.level = INFO
>     rootLogger.appenderRef.console.ref = ConsoleAppender
>     rootLogger.appenderRef.rolling.ref = RollingFileAppender
>
>     # Uncomment this if you want to _only_ change Flink's logging
>     #logger.flink.name = org.apache.flink
>     #logger.flink.level = INFO
>
>     # The following lines keep the log level of common
> libraries/connectors on
>     # log level INFO. The root logger does not override this. You have to
> manually
>     # change the log levels here.
>     logger.akka.name = akka
>     logger.akka.level = INFO
>     logger.kafka.name= org.apache.kafka
>     logger.kafka.level = INFO
>     logger.hadoop.name = org.apache.hadoop
>     logger.hadoop.level = INFO
>     logger.zookeeper.name = org.apache.zookeeper
>     logger.zookeeper.level = INFO
>
>     # Log all infos to the console
>     appender.console.name = ConsoleAppender
>     appender.console.type = CONSOLE
>     appender.console.layout.type = PatternLayout
>     appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
>
>     # Log all infos in the given rolling file
>     appender.rolling.name = RollingFileAppender
>     appender.rolling.type = RollingFile
>     appender.rolling.append = false
>     appender.rolling.fileName = ${sys:log.file}
>     appender.rolling.filePattern = ${sys:log.file}.%i
>     appender.rolling.layout.type = PatternLayout
>     appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
>     appender.rolling.policies.type = Policies
>     appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
>     appender.rolling.policies.size.size=100MB
>     appender.rolling.strategy.type = DefaultRolloverStrategy
>     appender.rolling.strategy.max = 10
>
>     # Suppress the irrelevant (wrong) warnings from the Netty channel
> handler
>     logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
>     logger.netty.level = OFF
>
> Job-manager-service.yaml:
>
> apiVersion: v1
> kind: Service
> metadata:
>   name: flink-jobmanager
> spec:
>   type: ClusterIP
>   ports:
>   - name: rpc
>     port: 6123
>   - name: blob-server
>     port: 6124
>   - name: webui
>     port: 8081
>   selector:
>     app: flink
>     component: jobmanager
>
> Job-manager.yaml
>
> apiVersion: batch/v1
> kind: Job
> metadata:
>   name: flink-jobmanager
> spec:
>   template:
>     metadata:
>       labels:
>         app: flink
>         component: jobmanager
>     spec:
>       restartPolicy: OnFailure
>       containers:
>         - name: jobmanager
>           image: pyflink:v1
>           env:
>           args: ["standalone-job", "-py",
> "/opt/flink/examples/python/table/batch/word_count.py"]
>           ports:
>             - containerPort: 6123
>               name: rpc
>             - containerPort: 6124
>               name: blob-server
>             - containerPort: 8081
>               name: webui
>           livenessProbe:
>             tcpSocket:
>               port: 6123
>             initialDelaySeconds: 30
>             periodSeconds: 60
>           volumeMounts:
>             - name: flink-config-volume
>               mountPath: /opt/flink/conf
>           securityContext:
>             runAsUser: 9999  # refers to user _flink_ from official flink
> image, change if necessary
>       volumes:
>         - name: flink-config-volume
>           configMap:
>             name: flink-config
>             items:
>               - key: flink-conf.yaml
>                 path: flink-conf.yaml
>               - key: log4j-console.properties
>                 path: log4j-console.properties
>
> Task-manager.yaml
>
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-taskmanager
> spec:
>   replicas: 2
>   selector:
>     matchLabels:
>       app: flink
>       component: taskmanager
>   template:
>     metadata:
>       labels:
>         app: flink
>         component: taskmanager
>     spec:
>       containers:
>       - name: taskmanager
>         image: pyflink:v1
>         env:
>         args: ["taskmanager"]
>         ports:
>         - containerPort: 6122
>           name: rpc
>         - containerPort: 6125
>           name: query-state
>         livenessProbe:
>           tcpSocket:
>             port: 6122
>           initialDelaySeconds: 30
>           periodSeconds: 60
>         volumeMounts:
>         - name: flink-config-volume
>           mountPath: /opt/flink/conf/
>         securityContext:
>           runAsUser: 9999  # refers to user _flink_ from official flink
> image, change if necessary
>       volumes:
>       - name: flink-config-volume
>         configMap:
>           name: flink-config
>           items:
>           - key: flink-conf.yaml
>             path: flink-conf.yaml
>           - key: log4j-console.properties
>             path: log4j-console.properties
>
> 3. Creating resources:
>
> $ kubectl create -f flink-configuration-configmap.yaml$ kubectl create -f jobmanager-service.yaml# Create the deployments for the cluster$ kubectl create -f job-manager.yaml$ kubectl create -f task-manager.yaml
>
> Best,
> Shuiqiang
>
> Shuiqiang Chen <ac...@gmail.com> 于2021年3月6日周六 下午5:10写道:
>
>> Hi Kevin,
>>
>> You are able to run PyFlink applications on kuberetes cluster, both
>> native k8s mode and resource definition mode are supported since
>> release-1.12.0. Currently, Python and PyFlink are not enabled in official
>> flink docker image, that you might need to build a custom image with Python
>> and PyFlink install, please refer to Enbale Python in docker
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
>> .
>>
>> Generally, by setting the value of args field in
>> `jobmanager-application.yaml` to be args: ["standalone-job", "--python",
>> "my_python_app.py", <optional arguments>, <job arguments>] the job
>> manager will try to submit a PyFlink job with the specified python file
>> once it is started. You can check the pod status for jobmanger and
>> taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will
>> turn to the completed state once the job is finished or error state if
>> there is something wrong, while the task manger pod will always be in the
>> running state.
>>
>> Finally, it requires you to tear down the cluster by deleting all created
>> resources (jobmanger/taskmanger jobs, flink-conf configmap,
>> jobmanger-service, etc).
>>
>> Best,
>> Shuiqiang
>>
>>
>>
>> Kevin Lam <ke...@shopify.com> 于2021年3月6日周六 上午5:29写道:
>>
>>> Hello everyone,
>>>
>>> I'm looking to run a Pyflink application run in a distributed fashion,
>>> using kubernetes, and am currently facing issues. I've successfully gotten
>>> a Scala Flink Application to run using the manifests provided at [0]
>>>
>>> I attempted to run the application by updating the jobmanager command
>>> args from
>>>
>>>  args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>]
>>>
>>> to
>>>
>>> args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>]
>>>
>>> But this didn't work. It resulted in the following error:
>>>
>>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>>> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
>>> org.apache.commons.cli.Options. A different class with the same name was
>>> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
>>> module of loader 'app'
>>>
>>> I was able to get things to 'run' by setting args to:
>>>
>>> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]
>>>
>>>
>>> But I'm not sure if things were running in a distributed fashion or not.
>>>
>>> 1/ Is there a good way to check if the task pods were being correctly
>>> utilized?
>>>
>>> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
>>> kubernetes?
>>>
>>> Open to any suggestions you may have. Note: we'd prefer not to run using
>>> the native K8S route outlined at [1] because we need to maintain the
>>> ability to customize certain aspects of the deployment (eg. mounting SSDs
>>> to some of the pods)
>>>
>>> Thanks in advance!
>>>
>>> [0]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>>>
>>>

Re: Running Pyflink job on K8s Flink Cluster Deployment?

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Kevin,

For your information, bellow is an example for running a PyFlink table API
WordCount job.

1. Building a Docker image with Python and PyFlink Installed:

Dockerfile:

FROM flink:1.12.0


# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf
/var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# install Python Flink

RUN pip3 install apache-flink==1.12.0

2. Resource definitions:

Flink-configuration-configmap.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors
on
    # log level INFO. The root logger does not override this. You have to
manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel
handler
    logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

Job-manager-service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

Job-manager.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: pyflink:v1
          env:
          args: ["standalone-job", "-py",
"/opt/flink/examples/python/table/batch/word_count.py"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink
image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties

Task-manager.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: pyflink:v1
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink
image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

3. Creating resources:

$ kubectl create -f flink-configuration-configmap.yaml$ kubectl create
-f jobmanager-service.yaml# Create the deployments for the cluster$
kubectl create -f job-manager.yaml$ kubectl create -f
task-manager.yaml

Best,
Shuiqiang

Shuiqiang Chen <ac...@gmail.com> 于2021年3月6日周六 下午5:10写道:

> Hi Kevin,
>
> You are able to run PyFlink applications on kuberetes cluster, both native
> k8s mode and resource definition mode are supported since release-1.12.0.
> Currently, Python and PyFlink are not enabled in official flink docker
> image, that you might need to build a custom image with Python and PyFlink
> install, please refer to Enbale Python in docker
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
> .
>
> Generally, by setting the value of args field in
> `jobmanager-application.yaml` to be args: ["standalone-job", "--python",
> "my_python_app.py", <optional arguments>, <job arguments>] the job
> manager will try to submit a PyFlink job with the specified python file
> once it is started. You can check the pod status for jobmanger and
> taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will
> turn to the completed state once the job is finished or error state if
> there is something wrong, while the task manger pod will always be in the
> running state.
>
> Finally, it requires you to tear down the cluster by deleting all created
> resources (jobmanger/taskmanger jobs, flink-conf configmap,
> jobmanger-service, etc).
>
> Best,
> Shuiqiang
>
>
>
> Kevin Lam <ke...@shopify.com> 于2021年3月6日周六 上午5:29写道:
>
>> Hello everyone,
>>
>> I'm looking to run a Pyflink application run in a distributed fashion,
>> using kubernetes, and am currently facing issues. I've successfully gotten
>> a Scala Flink Application to run using the manifests provided at [0]
>>
>> I attempted to run the application by updating the jobmanager command
>> args from
>>
>>  args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>]
>>
>> to
>>
>> args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>]
>>
>> But this didn't work. It resulted in the following error:
>>
>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
>> org.apache.commons.cli.Options. A different class with the same name was
>> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
>> module of loader 'app'
>>
>> I was able to get things to 'run' by setting args to:
>>
>> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]
>>
>>
>> But I'm not sure if things were running in a distributed fashion or not.
>>
>> 1/ Is there a good way to check if the task pods were being correctly
>> utilized?
>>
>> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
>> kubernetes?
>>
>> Open to any suggestions you may have. Note: we'd prefer not to run using
>> the native K8S route outlined at [1] because we need to maintain the
>> ability to customize certain aspects of the deployment (eg. mounting SSDs
>> to some of the pods)
>>
>> Thanks in advance!
>>
>> [0]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>>
>>

Re: Running Pyflink job on K8s Flink Cluster Deployment?

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Kevin,

You are able to run PyFlink applications on kuberetes cluster, both native
k8s mode and resource definition mode are supported since release-1.12.0.
Currently, Python and PyFlink are not enabled in official flink docker
image, that you might need to build a custom image with Python and PyFlink
install, please refer to Enbale Python in docker
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
.

Generally, by setting the value of args field in
`jobmanager-application.yaml` to be args: ["standalone-job", "--python",
"my_python_app.py", <optional arguments>, <job arguments>] the job manager
will try to submit a PyFlink job with the specified python file once it is
started. You can check the pod status for jobmanger and taskmanger via
`kubectl get pods [-n namespace]`. The job manger pod will turn to the
completed state once the job is finished or error state if there is
something wrong, while the task manger pod will always be in the running
state.

Finally, it requires you to tear down the cluster by deleting all created
resources (jobmanger/taskmanger jobs, flink-conf configmap,
jobmanger-service, etc).

Best,
Shuiqiang



Kevin Lam <ke...@shopify.com> 于2021年3月6日周六 上午5:29写道:

> Hello everyone,
>
> I'm looking to run a Pyflink application run in a distributed fashion,
> using kubernetes, and am currently facing issues. I've successfully gotten
> a Scala Flink Application to run using the manifests provided at [0]
>
> I attempted to run the application by updating the jobmanager command args
> from
>
>  args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>]
>
> to
>
> args: ["standalone-job", "--python", "my_python_app.py", <optional arguments>, <job arguments>]
>
> But this didn't work. It resulted in the following error:
>
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
> org.apache.commons.cli.Options. A different class with the same name was
> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
> module of loader 'app'
>
> I was able to get things to 'run' by setting args to:
>
> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]
>
>
> But I'm not sure if things were running in a distributed fashion or not.
>
> 1/ Is there a good way to check if the task pods were being correctly
> utilized?
>
> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
> kubernetes?
>
> Open to any suggestions you may have. Note: we'd prefer not to run using
> the native K8S route outlined at [1] because we need to maintain the
> ability to customize certain aspects of the deployment (eg. mounting SSDs
> to some of the pods)
>
> Thanks in advance!
>
> [0]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>
>