You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gorjan Todorovski <go...@gmail.com> on 2021/08/14 14:37:33 UTC

Running Beam on a native Kubernetes Flink cluster

Hi!

I need help implementing a native Kubernetes Flink cluster that needs to
run batch jobs (run by TensorFlow Extended), but I am not sure I am
configuring it right as I have issues running jobs on more than one task
manager, while jobs run fine if there is only one TM.

I use the following parameters for the job:

        "--runner=FlinkRunner",
        "--parallelism=4",
        f"--flink_master={flink_url}:8081",
        "--environment_type=EXTERNAL",
        f"--environment_config={beam_sdk_url}:50000",
        "--flink_submit_uber_jar",
        "--worker_harness_container_image=none",


I have configured the Beam workers to run as side-cars to the TM
containers. I do this by configuring. task manager template for the pods
like this:

kubernetes.pod-template-file.taskmanager

it is pointing out to a template file with contents:

kind: Pod
metadata:
  name: taskmanager-pod-template
spec:
     #hostNetwork: true
     containers:
      - name: flink-main-container
        #image: apache/flink:scala_2.12
        env:
          - name: AWS_REGION
            value: "eu-central-1"
          - name: S3_VERIFY_SSL
            value: "0"
          - name: PYTHONPATH
            value: "/data/flink/src"
        args: ["taskmanager"]
        ports:
        - containerPort: 6122 #22
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122 #22
          initialDelaySeconds: 30
          periodSeconds: 60
      - name: beam-worker-pool
        env:
          - name: PYTHONPATH
            value: "/data/flink/src"
          - name: AWS_REGION
            value: "eu-central-1"
          - name: S3_VERIFY_SSL
            value: "0"
        image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
        imagePullPolicy: Always
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60

I have also created a kubernetes load balancer for the task managers, so
clients can connect on port 50000. So I use that address when configuring:

f"--environment_config={beam_sdk_url}:50000",

the problem is as it looks like the Beam SDK harness on one task manager
wants to connect to the endpoint running on the other task manager, but
looks for it on localhost:

Log from beam-worker-pool on TM 2:

2021/08/11 09:43:16 Failed to obtain provisioning information: failed
to dial server at localhost:33705
    caused by:
context deadline exceeded

The provision endpoint on TM 1 is the one actually listening on the port
33705, while this is looking for it on localhost, so cannot connect to it.

Showing how I test this:

...............

TM 1:
========
$ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
2021/08/12 09:10:34 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=50000
--container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:33383',
'--artifact_endpoint=localhost:43477',
'--provision_endpoint=localhost:40983',
'--control_endpoint=localhost:34793']
2021/08/12 09:13:05 Failed to obtain provisioning information: failed
to dial server at localhost:40983
    caused by:
context deadline exceeded

TM 2:
=========
$ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
2021/08/12 09:10:33 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=50000
--container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:40497',
'--artifact_endpoint=localhost:36245',
'--provision_endpoint=localhost:32907',
'--control_endpoint=localhost:46083']
2021/08/12 09:13:09 Failed to obtain provisioning information: failed
to dial server at localhost:32907
    caused by:
context deadline exceeded

Testing:
.........................

TM 1:
============
$ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c
beam-worker-pool -- bash
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
curl: (7) Failed to connect to localhost port 40983: Connection refused

root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
Warning: Binary output can mess up your terminal. Use "--output -" to ...


TM 2:
=============
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
curl: (7) Failed to connect to localhost port 32907: Connection refused

root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
Warning: Binary output can mess up your terminal. Use "--output -" to tell
Warning: curl to output it to your terminal anyway, or consider "--output

Not sure how to fix this.

Thanks, Gorjan

Re: Running Beam on a native Kubernetes Flink cluster

Posted by Gorjan Todorovski <go...@gmail.com>.
Yes! That did it. Changed to localhost and all works fine now.
I was wrong thinking it  would like to connect to Beam SDK worker from my
client machine, hence i added the load balancer.

Thank you Jan!

On Sun, 15 Aug 2021 at 16:45, Jan Lukavský <je...@seznam.cz> wrote:

> Hi Gorjan,
>
> the address of localhost is hard-coded in the python worker pool (see
> [1]). There should be no need to setup a load-balancer for the worker_pool,
> if you have it as another container in each TM pod, it should suffice to
> replace {beam_sdk_url} with 'localhost'. Each TM will then have its own
> worker_pool, which should be just fine.
>
> Best,
>
>  Jan
>
> [1]
> https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L81
> On 8/14/21 4:37 PM, Gorjan Todorovski wrote:
>
> Hi!
>
> I need help implementing a native Kubernetes Flink cluster that needs to
> run batch jobs (run by TensorFlow Extended), but I am not sure I am
> configuring it right as I have issues running jobs on more than one task
> manager, while jobs run fine if there is only one TM.
>
> I use the following parameters for the job:
>
>         "--runner=FlinkRunner",
>         "--parallelism=4",
>         f"--flink_master={flink_url}:8081",
>         "--environment_type=EXTERNAL",
>         f"--environment_config={beam_sdk_url}:50000",
>         "--flink_submit_uber_jar",
>         "--worker_harness_container_image=none",
>
>
> I have configured the Beam workers to run as side-cars to the TM
> containers. I do this by configuring. task manager template for the pods
> like this:
>
> kubernetes.pod-template-file.taskmanager
>
> it is pointing out to a template file with contents:
>
> kind: Pod
> metadata:
>   name: taskmanager-pod-template
> spec:
>      #hostNetwork: true
>      containers:
>       - name: flink-main-container
>         #image: apache/flink:scala_2.12
>         env:
>           - name: AWS_REGION
>             value: "eu-central-1"
>           - name: S3_VERIFY_SSL
>             value: "0"
>           - name: PYTHONPATH
>             value: "/data/flink/src"
>         args: ["taskmanager"]
>         ports:
>         - containerPort: 6122 #22
>           name: rpc
>         - containerPort: 6125
>           name: query-state
>         livenessProbe:
>           tcpSocket:
>             port: 6122 #22
>           initialDelaySeconds: 30
>           periodSeconds: 60
>       - name: beam-worker-pool
>         env:
>           - name: PYTHONPATH
>             value: "/data/flink/src"
>           - name: AWS_REGION
>             value: "eu-central-1"
>           - name: S3_VERIFY_SSL
>             value: "0"
>         image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
>         imagePullPolicy: Always
>         args: ["--worker_pool"]
>         ports:
>         - containerPort: 50000
>           name: pool
>         livenessProbe:
>           tcpSocket:
>             port: 50000
>           initialDelaySeconds: 30
>           periodSeconds: 60
>
> I have also created a kubernetes load balancer for the task managers, so
> clients can connect on port 50000. So I use that address when configuring:
>
> f"--environment_config={beam_sdk_url}:50000",
>
> the problem is as it looks like the Beam SDK harness on one task manager
> wants to connect to the endpoint running on the other task manager, but
> looks for it on localhost:
>
> Log from beam-worker-pool on TM 2:
>
> 2021/08/11 09:43:16 Failed to obtain provisioning information: failed to dial server at localhost:33705
>     caused by:
> context deadline exceeded
>
> The provision endpoint on TM 1 is the one actually listening on the port
> 33705, while this is looking for it on localhost, so cannot connect to it.
>
> Showing how I test this:
>
> ...............
>
> TM 1:
> ========
> $ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
> 2021/08/12 09:10:34 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
> Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:33383', '--artifact_endpoint=localhost:43477', '--provision_endpoint=localhost:40983', '--control_endpoint=localhost:34793']
> 2021/08/12 09:13:05 Failed to obtain provisioning information: failed to dial server at localhost:40983
>     caused by:
> context deadline exceeded
>
> TM 2:
> =========
> $ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
> 2021/08/12 09:10:33 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
> Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:40497', '--artifact_endpoint=localhost:36245', '--provision_endpoint=localhost:32907', '--control_endpoint=localhost:46083']
> 2021/08/12 09:13:09 Failed to obtain provisioning information: failed to dial server at localhost:32907
>     caused by:
> context deadline exceeded
>
> Testing:
> .........................
>
> TM 1:
> ============
> $ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash
> root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
> curl: (7) Failed to connect to localhost port 40983: Connection refused
>
> root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
> Warning: Binary output can mess up your terminal. Use "--output -" to ...
>
>
> TM 2:
> =============
> root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
> curl: (7) Failed to connect to localhost port 32907: Connection refused
>
> root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
> Warning: Binary output can mess up your terminal. Use "--output -" to tell
> Warning: curl to output it to your terminal anyway, or consider "--output
>
> Not sure how to fix this.
>
> Thanks, Gorjan
>
>

Re: Running Beam on a native Kubernetes Flink cluster

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Gorjan,

the address of localhost is hard-coded in the python worker pool (see 
[1]). There should be no need to setup a load-balancer for the 
worker_pool, if you have it as another container in each TM pod, it 
should suffice to replace {beam_sdk_url} with 'localhost'. Each TM will 
then have its own worker_pool, which should be just fine.

Best,

  Jan

[1] 
https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L81

On 8/14/21 4:37 PM, Gorjan Todorovski wrote:
> Hi!
>
> I need help implementing a native Kubernetes Flink cluster that needs 
> to run batch jobs (run by TensorFlow Extended), but I am not sure I am 
> configuring it right as I have issues running jobs on more than one 
> task manager, while jobs run fine if there is only one TM.
>
> I use the following parameters for the job:
>
> |"--runner=FlinkRunner", "--parallelism=4", 
> f"--flink_master={flink_url}:8081", "--environment_type=EXTERNAL", 
> f"--environment_config={beam_sdk_url}:50000", 
> "--flink_submit_uber_jar", "--worker_harness_container_image=none", |
>
> I have configured the Beam workers to run as side-cars to the TM 
> containers. I do this by configuring. task manager template for the 
> pods like this:
>
> |kubernetes.pod-template-file.taskmanager|
>
> it is pointing out to a template file with contents:
>
> |kind: Pod metadata: name: taskmanager-pod-template spec: 
> #hostNetwork: true containers: - name: flink-main-container #image: 
> apache/flink:scala_2.12 env: - name: AWS_REGION value: "eu-central-1" 
> - name: S3_VERIFY_SSL value: "0" - name: PYTHONPATH value: 
> "/data/flink/src" args: ["taskmanager"] ports: - containerPort: 6122 
> #22 name: rpc - containerPort: 6125 name: query-state livenessProbe: 
> tcpSocket: port: 6122 #22 initialDelaySeconds: 30 periodSeconds: 60 - 
> name: beam-worker-pool env: - name: PYTHONPATH value: 
> "/data/flink/src" - name: AWS_REGION value: "eu-central-1" - name: 
> S3_VERIFY_SSL value: "0" image: 
> 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers 
> <http://848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers> 
> imagePullPolicy: Always args: ["--worker_pool"] ports: - 
> containerPort: 50000 name: pool livenessProbe: tcpSocket: port: 50000 
> initialDelaySeconds: 30 periodSeconds: 60 |
>
> I have also created a kubernetes load balancer for the task managers, 
> so clients can connect on port 50000. So I use that address when 
> configuring:
>
> |f"--environment_config={beam_sdk_url}:50000",|
>
> the problem is as it looks like the Beam SDK harness on one task 
> manager wants to connect to the endpoint running on the other task 
> manager, but looks for it on localhost:
>
> Log from beam-worker-pool on TM 2:
>
> |2021/08/11 09:43:16 Failed to obtain provisioning information: failed 
> to dial server at localhost:33705 caused by: context deadline exceeded |
>
> The provision endpoint on TM 1 is the one actually listening on the 
> port 33705, while this is looking for it on localhost, so cannot 
> connect to it.
>
> Showing how I test this:
>
> |............... TM 1: ======== $ kubectl logs 
> my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool 2021/08/12 
> 09:10:34 Starting worker pool 1: python -m 
> apache_beam.runners.worker.worker_pool_main --service_port=50000 
> --container_executable=/opt/apache/beam/boot Starting worker with 
> command ['/opt/apache/beam/boot', '--id=1-1', 
> '--logging_endpoint=localhost:33383', 
> '--artifact_endpoint=localhost:43477', 
> '--provision_endpoint=localhost:40983', 
> '--control_endpoint=localhost:34793'] 2021/08/12 09:13:05 Failed to 
> obtain provisioning information: failed to dial server at 
> localhost:40983 caused by: context deadline exceeded TM 2: ========= $ 
> kubectl logs my-first-flink-cluster-taskmanager-1-2 -c 
> beam-worker-pool 2021/08/12 09:10:33 Starting worker pool 1: python -m 
> apache_beam.runners.worker.worker_pool_main --service_port=50000 
> --container_executable=/opt/apache/beam/boot Starting worker with 
> command ['/opt/apache/beam/boot', '--id=1-1', 
> '--logging_endpoint=localhost:40497', 
> '--artifact_endpoint=localhost:36245', 
> '--provision_endpoint=localhost:32907', 
> '--control_endpoint=localhost:46083'] 2021/08/12 09:13:09 Failed to 
> obtain provisioning information: failed to dial server at 
> localhost:32907 caused by: context deadline exceeded Testing: 
> ......................... TM 1: ============ $ kubectl exec -it 
> my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash 
> root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983 
> curl: (7) Failed to connect to localhost port 40983: Connection 
> refused root@my-first-flink-cluster-taskmanager-1-1:/# curl 
> localhost:32907 Warning: Binary output can mess up your terminal. Use 
> "--output -" to ... TM 2: ============= 
> root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907 
> curl: (7) Failed to connect to localhost port 32907: Connection 
> refused root@my-first-flink-cluster-taskmanager-1-2:/# curl 
> localhost:40983 Warning: Binary output can mess up your terminal. Use 
> "--output -" to tell Warning: curl to output it to your terminal 
> anyway, or consider "--output |
>
> Not sure how to fix this.
>
> Thanks, Gorjan
>
>