You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Burcu Gul POLAT EGRI <be...@sdt.com.tr> on 2022/03/25 13:37:38 UTC

"Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

I am getting the following error when I try to execute sample at Flink documentation - Native Kubernetes<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/>.

I have succedded to execute the first command in documentation by adding some extra parameters with the help of this post<https://cloudolife.com/2020/12/12/Cloud-Native/BIg-Data/Flink/Deploy-a-Apache-Flink-session-cluster-natively-on-Kubernetes-K8S/>.

user@local:~/flink-1.14.4$ ./bin/kubernetes-session.sh \

-Dkubernetes.cluster-id=dproc-example-flink-cluster-id \

-Dtaskmanager.memory.process.size=4096m \

-Dkubernetes.taskmanager.cpu=2 \

-Dtaskmanager.numberOfTaskSlots=4 \

-Dresourcemanager.taskmanager-timeout=3600000 \

-Dkubernetes.namespace=sdt-dproc-flink-test \

-Dkubernetes.config.file=/home/devuser/.kube/config \

-Dkubernetes.jobmanager.service-account=flink-service-account

After executing above command, I have listed the new pod like below.

user@local:~/flink-1.14.4$ kubectl get pods

NAME                                             READY   STATUS    RESTARTS   AGE

dproc-example-flink-cluster-id-68c79bf67-mwh52   1/1     Running   0          1m

Then, I have executed the below command to submit example job.

user@local:~/flink-1.14.4$ ./bin/flink run --target kubernetes-session \

-Dkubernetes.service-account=flink-service-account \

-Dkubernetes.cluster-id=dproc-example-flink-cluster-id \

-Dkubernetes.namespace=sdt-dproc-flink-test \

-Dkubernetes.config.file=/home/devuser/.kube/config

examples/batch/WordCount.jar --input /home/user/sometexts.txt --output /tmp/flinksample

After a while, I received below logs:

2022-03-25 12:38:00,538 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster dproc-example-flink-cluster-id successfully, JobManager Web Interface: http://10.150.140.248:8081



------------------------------------------------------------

 The program finished with the following exception:



org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)

    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)

    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)

    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)

    at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:131)

    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)

    at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)

    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.base/java.lang.reflect.Method.invoke(Method.java:566)

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

    ... 8 more

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)

    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)

    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)

    ... 16 more

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:433)

    at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)

    at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:399)

    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)

    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:476)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)

    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:262)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)

    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

    at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.

    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:395)

    ... 21 more

Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081

    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)

    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)

    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)

    ... 19 more

Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081

    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)

    ... 8 more

I understand from the last part of this error that the JobManager Web Interface URL is wrong because when I check the Kubernetes services, port is different.

user@local:~/flink-1.14.4$ kubectl get svc

NAME                                  TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE

dproc-example-flink-cluster-id        ClusterIP      None            <none>        6123/TCP,6124/TCP   6h32m

dproc-example-flink-cluster-id-rest   LoadBalancer   10.97.100.197   <pending>     8081:30976/TCP      6h32m

The port should be 30976 rather that 8081. I have already tried to edit rest.port in flink-conf.yaml with this value and also as parameter from command line. But nothing changed. Always I get this error.

How can I force Flink client to access correct JobManager URL.

Burcu

Bu e-posta ve i?eri?i ki?iye ?zel ve gizli bilgiler i?erebilir. E?er mesaj?n muhatab? veya muhataba iletmekle y?k?ml? yetkili temsilcisi siz de?ilseniz, bu mesaj? ?o?altmak, da??tmak, a??klamak dahil olmak ?zere herhangi bir suretle kullanmaman?z gerekti?ini, aksine davran???n?z?n hukuka ayk?r?l?k te?kil edebilece?ini bildiririz. E?er bu mesaj? yanl??l?kla ald?ysan?z, l?tfen g?ndericiye e-posta ile bildirerek siliniz. Bu mesajda belirtilen ?ahsi g?r??ler g?ndericiye aittir ve SDT A.?.'nin resmi g?r???n? temsil etmeyebilir.

This email and its contents may contain information that is privileged and confidential. If you are not an intended recipient,or the agent responsible for delivering this email to the intended recipient, you are hereby notified that any use, dissemination, distribution, or copying of this communication is strictly prohibited and may be unlawful. If you received this email in error, please notify the sender by replying to this email and delete the email sent in error. Personel opinions presented in this e-mail message are solely those of the author and do not necessarily represent SDT A.S.`s formal and authorized views.

RE: "Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

Posted by Burcu Gul POLAT EGRI <be...@sdt.com.tr>.
Thank you for quick reply Yang,

You are right, the configuration parameter was like below. When I delete the parameter and execute the sample again, the task manager pods were disappeared after job execution.

-Dresourcemanager.taskmanager-timeout=3600000

Best regards,
Burcu

From: Yang Wang [mailto:danrtsey.wy@gmail.com]
Sent: Tuesday, March 29, 2022 9:04 AM
To: Burcu Gul POLAT EGRI <be...@sdt.com.tr>
Cc: user@flink.apache.org
Subject: Re: "Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

By default, the idle TaskManager will be released after 30s(configured via "resourcemanager.taskmanager-timeout").
If it could not be removed, you need to check the JobManager logs for the root cause. Maybe it does not have enough permission or sth else.

Best,
Yang

Burcu Gul POLAT EGRI <be...@sdt.com.tr>> 于2022年3月29日周二 13:15写道:
Thank you, I have tried the first suggestion and the sample job executed successfully (last executed command is like below).

But I have another question. After executing the below command, a new task manager pod is created as expected but it is not removed automatically after the execution completed. Actually, for native kubernetes, I expect that the task manager pod should disappear after job completion.
Do you have any comment for this? Are there any other configuration for task manager pod removal?


./bin/flink run --target kubernetes-session -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=dproc-example-flink-cluster-id -Dkubernetes.namespace=sdt-dproc-flink-test -Dkubernetes.config.file=/home/devuser/.kube/config examples/batch/WordCount.jar

Best regards,
Burcu

From: Yang Wang [mailto:danrtsey.wy@gmail.com<ma...@gmail.com>]
Sent: Saturday, March 26, 2022 7:48 AM
To: Burcu Gul POLAT EGRI <be...@sdt.com.tr>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: "Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

The root cause might be the LoadBalancer could not really work in your environment. We already have a ticket to track this[1] and will try to get it resolved in the next release.

For now, could you please have a try by adding "-Dkubernetes.rest-service.exposed.type=NodePort" to your session and submission commands?

Maybe you are also interested in the new flink-kubernetes-operator project[2]. It should make it easier to run a Flink application on the K8s.

[1]. https://issues.apache.org/jira/browse/FLINK-17231
[2]. https://github.com/apache/flink-kubernetes-operator

Best,
Yang

Burcu Gul POLAT EGRI <be...@sdt.com.tr>> 于2022年3月25日周五 21:39写道:

I am getting the following error when I try to execute sample at Flink documentation - Native Kubernetes<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/>.

I have succedded to execute the first command in documentation by adding some extra parameters with the help of this post<https://cloudolife.com/2020/12/12/Cloud-Native/BIg-Data/Flink/Deploy-a-Apache-Flink-session-cluster-natively-on-Kubernetes-K8S/>.

user@local:~/flink-1.14.4$<mailto:user@local:~/flink-1.14.4$> ./bin/kubernetes-session.sh \

-Dkubernetes.cluster-id=dproc-example-flink-cluster-id \

-Dtaskmanager.memory.process.size=4096m \

-Dkubernetes.taskmanager.cpu=2 \

-Dtaskmanager.numberOfTaskSlots=4 \

-Dresourcemanager.taskmanager-timeout=3600000 \

-Dkubernetes.namespace=sdt-dproc-flink-test \

-Dkubernetes.config.file=/home/devuser/.kube/config \

-Dkubernetes.jobmanager.service-account=flink-service-account

After executing above command, I have listed the new pod like below.

user@local:~/flink-1.14.4$<mailto:user@local:~/flink-1.14.4$> kubectl get pods

NAME                                             READY   STATUS    RESTARTS   AGE

dproc-example-flink-cluster-id-68c79bf67-mwh52   1/1     Running   0          1m

Then, I have executed the below command to submit example job.

user@local:~/flink-1.14.4$<mailto:user@local:~/flink-1.14.4$> ./bin/flink run --target kubernetes-session \

-Dkubernetes.service-account=flink-service-account \

-Dkubernetes.cluster-id=dproc-example-flink-cluster-id \

-Dkubernetes.namespace=sdt-dproc-flink-test \

-Dkubernetes.config.file=/home/devuser/.kube/config

examples/batch/WordCount.jar --input /home/user/sometexts.txt --output /tmp/flinksample

After a while, I received below logs:

2022-03-25 12:38:00,538 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster dproc-example-flink-cluster-id successfully, JobManager Web Interface: http://10.150.140.248:8081



------------------------------------------------------------

 The program finished with the following exception:



org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)

    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)

    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)

    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)

    at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:131)

    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)

    at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)

    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.base/java.lang.reflect.Method.invoke(Method.java:566)

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

    ... 8 more

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)

    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)

    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)

    ... 16 more

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:433)

    at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)

    at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:399)

    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)

    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:476)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)

    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:262)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)

    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

    at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.

    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:395)

    ... 21 more

Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081<http://10.150.140.248:8081>

    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)

    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)

    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)

    ... 19 more

Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081<http://10.150.140.248:8081>

    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)

    ... 8 more

I understand from the last part of this error that the JobManager Web Interface URL is wrong because when I check the Kubernetes services, port is different.

user@local:~/flink-1.14.4$<mailto:user@local:~/flink-1.14.4$> kubectl get svc

NAME                                  TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE

dproc-example-flink-cluster-id        ClusterIP      None            <none>        6123/TCP,6124/TCP   6h32m

dproc-example-flink-cluster-id-rest   LoadBalancer   10.97.100.197   <pending>     8081:30976/TCP      6h32m

The port should be 30976 rather that 8081. I have already tried to edit rest.port in flink-conf.yaml with this value and also as parameter from command line. But nothing changed. Always I get this error.

How can I force Flink client to access correct JobManager URL.

Burcu

Bu e-posta ve içeriği kişiye özel ve gizli bilgiler içerebilir. Eğer mesajın muhatabı veya muhataba iletmekle yükümlü yetkili temsilcisi siz değilseniz, bu mesajı çoğaltmak, dağıtmak, açıklamak dahil olmak üzere herhangi bir suretle kullanmamanız gerektiğini, aksine davranışınızın hukuka aykırılık teşkil edebileceğini bildiririz. Eğer bu mesajı yanlışlıkla aldıysanız, lütfen göndericiye e-posta ile bildirerek siliniz. Bu mesajda belirtilen şahsi görüşler göndericiye aittir ve SDT A.Ş.’nin resmi görüşünü temsil etmeyebilir.

This email and its contents may contain information that is privileged and confidential. If you are not an intended recipient,or the agent responsible for delivering this email to the intended recipient, you are hereby notified that any use, dissemination, distribution, or copying of this communication is strictly prohibited and may be unlawful. If you received this email in error, please notify the sender by replying to this email and delete the email sent in error. Personel opinions presented in this e-mail message are solely those of the author and do not necessarily represent SDT A.S.`s formal and authorized views.
Bu e-posta ve içeriği kişiye özel ve gizli bilgiler içerebilir. Eğer mesajın muhatabı veya muhataba iletmekle yükümlü yetkili temsilcisi siz değilseniz, bu mesajı çoğaltmak, dağıtmak, açıklamak dahil olmak üzere herhangi bir suretle kullanmamanız gerektiğini, aksine davranışınızın hukuka aykırılık teşkil edebileceğini bildiririz. Eğer bu mesajı yanlışlıkla aldıysanız, lütfen göndericiye e-posta ile bildirerek siliniz. Bu mesajda belirtilen şahsi görüşler göndericiye aittir ve SDT A.Ş.’nin resmi görüşünü temsil etmeyebilir.

This email and its contents may contain information that is privileged and confidential. If you are not an intended recipient,or the agent responsible for delivering this email to the intended recipient, you are hereby notified that any use, dissemination, distribution, or copying of this communication is strictly prohibited and may be unlawful. If you received this email in error, please notify the sender by replying to this email and delete the email sent in error. Personel opinions presented in this e-mail message are solely those of the author and do not necessarily represent SDT A.S.`s formal and authorized views.
Bu e-posta ve içeriği kişiye özel ve gizli bilgiler içerebilir. Eğer mesajın muhatabı veya muhataba iletmekle yükümlü yetkili temsilcisi siz değilseniz, bu mesajı çoğaltmak, dağıtmak, açıklamak dahil olmak üzere herhangi bir suretle kullanmamanız gerektiğini, aksine davranışınızın hukuka aykırılık teşkil edebileceğini bildiririz. Eğer bu mesajı yanlışlıkla aldıysanız, lütfen göndericiye e-posta ile bildirerek siliniz. Bu mesajda belirtilen şahsi görüşler göndericiye aittir ve SDT A.Ş.’nin resmi görüşünü temsil etmeyebilir.

This email and its contents may contain information that is privileged and confidential. If you are not an intended recipient,or the agent responsible for delivering this email to the intended recipient, you are hereby notified that any use, dissemination, distribution, or copying of this communication is strictly prohibited and may be unlawful. If you received this email in error, please notify the sender by replying to this email and delete the email sent in error. Personel opinions presented in this e-mail message are solely those of the author and do not necessarily represent SDT A.S.`s formal and authorized views.

Re: "Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

Posted by Yang Wang <da...@gmail.com>.
By default, the idle TaskManager will be released after 30s(configured via
"resourcemanager.taskmanager-timeout").
If it could not be removed, you need to check the JobManager logs for the
root cause. Maybe it does not have enough permission or sth else.

Best,
Yang

Burcu Gul POLAT EGRI <be...@sdt.com.tr> 于2022年3月29日周二 13:15写道:

> Thank you, I have tried the first suggestion and the sample job executed
> successfully (last executed command is like below).
>
>
>
> But I have another question. After executing the below command, a new task
> manager pod is created as expected but it is not removed automatically
> after the execution completed. Actually, for native kubernetes, I expect
> that the task manager pod should disappear after job completion.
>
> Do you have any comment for this? Are there any other configuration for
> task manager pod removal?
>
>
>
>
>
> ./bin/flink run --target kubernetes-session
> -Dkubernetes.service-account=flink-service-account
> -Dkubernetes.rest-service.exposed.type=NodePort
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id
> -Dkubernetes.namespace=sdt-dproc-flink-test
> -Dkubernetes.config.file=/home/devuser/.kube/config
> examples/batch/WordCount.jar
>
>
>
> Best regards,
>
> Burcu
>
>
>
> *From:* Yang Wang [mailto:danrtsey.wy@gmail.com]
> *Sent:* Saturday, March 26, 2022 7:48 AM
> *To:* Burcu Gul POLAT EGRI <be...@sdt.com.tr>
> *Cc:* user@flink.apache.org
> *Subject:* Re: "Native Kubernetes" sample in Flink documentation fails.
> JobManager Web Interface is wrongly generated. [Flink 1.14.4]
>
>
>
> The root cause might be the LoadBalancer could not really work in your
> environment. We already have a ticket to track this[1] and will try to get
> it resolved in the next release.
>
>
>
> For now, could you please have a try by adding
> "-Dkubernetes.rest-service.exposed.type=NodePort" to your session and
> submission commands?
>
>
>
> Maybe you are also interested in the new flink-kubernetes-operator
> project[2]. It should make it easier to run a Flink application on the K8s.
>
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17231
>
> [2]. https://github.com/apache/flink-kubernetes-operator
>
>
>
> Best,
>
> Yang
>
>
>
> Burcu Gul POLAT EGRI <be...@sdt.com.tr> 于2022年3月25日周五 21:39写道:
>
> I am getting the following error when I try to execute sample at Flink
> documentation - Native Kubernetes
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/>
> .
>
> I have succedded to execute the first command in documentation by adding
> some extra parameters with the help of this post
> <https://cloudolife.com/2020/12/12/Cloud-Native/BIg-Data/Flink/Deploy-a-Apache-Flink-session-cluster-natively-on-Kubernetes-K8S/>
> .
>
> user@local:~/flink-1.14.4$ ./bin/kubernetes-session.sh \
>
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id \
>
> -Dtaskmanager.memory.process.size=4096m \
>
> -Dkubernetes.taskmanager.cpu=2 \
>
> -Dtaskmanager.numberOfTaskSlots=4 \
>
> -Dresourcemanager.taskmanager-timeout=3600000 \
>
> -Dkubernetes.namespace=sdt-dproc-flink-test \
>
> -Dkubernetes.config.file=/home/devuser/.kube/config \
>
> -Dkubernetes.jobmanager.service-account=flink-service-account
>
> After executing above command, I have listed the new pod like below.
>
> user@local:~/flink-1.14.4$ kubectl get pods
>
> NAME                                             READY   STATUS    RESTARTS   AGE
>
> dproc-example-flink-cluster-id-68c79bf67-mwh52   1/1     Running   0          1m
>
> Then, I have executed the below command to submit example job.
>
> user@local:~/flink-1.14.4$ ./bin/flink run --target kubernetes-session \
>
> -Dkubernetes.service-account=flink-service-account \
>
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id \
>
> -Dkubernetes.namespace=sdt-dproc-flink-test \
>
> -Dkubernetes.config.file=/home/devuser/.kube/config
>
> examples/batch/WordCount.jar --input /home/user/sometexts.txt --output /tmp/flinksample
>
> After a while, I received below logs:
>
> 2022-03-25 12:38:00,538 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster dproc-example-flink-cluster-id successfully, JobManager Web Interface: http://10.150.140.248:8081
>
>
>
> ------------------------------------------------------------
>
>  The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
>     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>
>     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
>     at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>
>     at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>
>     at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>
>     at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>
>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>
>     at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
>
>     at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
>
>     at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:131)
>
>     at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>
>     at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)
>
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>
>     ... 8 more
>
> Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>
>     at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>
>     at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>
>     at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>
>     ... 16 more
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>
>     at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:433)
>
>     at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
>
>     at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
>
>     at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>
>     at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>
>     at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:399)
>
>     at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>
>     at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>
>     at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>
>     at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>
>     at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:476)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:262)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>
>     at java.base/java.lang.Thread.run(Thread.java:829)
>
> Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
>
>     at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:395)
>
>     ... 21 more
>
> Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081
>
>     at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>
>     at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>
>     at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
>
>     ... 19 more
>
> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081
>
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
>
>     ... 8 more
>
> I understand from the last part of this error that the JobManager Web
> Interface URL is wrong because when I check the Kubernetes services, port
> is different.
>
> user@local:~/flink-1.14.4$ kubectl get svc
>
> NAME                                  TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
>
> dproc-example-flink-cluster-id        ClusterIP      None            <none>        6123/TCP,6124/TCP   6h32m
>
> dproc-example-flink-cluster-id-rest   LoadBalancer   10.97.100.197   <pending>     8081:30976/TCP      6h32m
>
> The port should be 30976 rather that 8081. I have already tried to edit
> rest.port in flink-conf.yaml with this value and also as parameter from
> command line. But nothing changed. Always I get this error.
>
> How can I force Flink client to access correct JobManager URL.
>
>
>
> *Burcu *
>
>
>
> Bu e-posta ve içeriği kişiye özel ve gizli bilgiler içerebilir. Eğer
> mesajın muhatabı veya muhataba iletmekle yükümlü yetkili temsilcisi siz
> değilseniz, bu mesajı çoğaltmak, dağıtmak, açıklamak dahil olmak üzere
> herhangi bir suretle kullanmamanız gerektiğini, aksine davranışınızın
> hukuka aykırılık teşkil edebileceğini bildiririz. Eğer bu mesajı
> yanlışlıkla aldıysanız, lütfen göndericiye e-posta ile bildirerek siliniz.
> Bu mesajda belirtilen şahsi görüşler göndericiye aittir ve SDT A.Ş.’nin
> resmi görüşünü temsil etmeyebilir.
>
> This email and its contents may contain information that is privileged and
> confidential. If you are not an intended recipient,or the agent responsible
> for delivering this email to the intended recipient, you are hereby
> notified that any use, dissemination, distribution, or copying of this
> communication is strictly prohibited and may be unlawful. If you received
> this email in error, please notify the sender by replying to this email and
> delete the email sent in error. Personel opinions presented in this e-mail
> message are solely those of the author and do not necessarily represent SDT
> A.S.`s formal and authorized views.
>
> Bu e-posta ve içeriği kişiye özel ve gizli bilgiler içerebilir. Eğer
> mesajın muhatabı veya muhataba iletmekle yükümlü yetkili temsilcisi siz
> değilseniz, bu mesajı çoğaltmak, dağıtmak, açıklamak dahil olmak üzere
> herhangi bir suretle kullanmamanız gerektiğini, aksine davranışınızın
> hukuka aykırılık teşkil edebileceğini bildiririz. Eğer bu mesajı
> yanlışlıkla aldıysanız, lütfen göndericiye e-posta ile bildirerek siliniz.
> Bu mesajda belirtilen şahsi görüşler göndericiye aittir ve SDT A.Ş.’nin
> resmi görüşünü temsil etmeyebilir.
>
> This email and its contents may contain information that is privileged and
> confidential. If you are not an intended recipient,or the agent responsible
> for delivering this email to the intended recipient, you are hereby
> notified that any use, dissemination, distribution, or copying of this
> communication is strictly prohibited and may be unlawful. If you received
> this email in error, please notify the sender by replying to this email and
> delete the email sent in error. Personel opinions presented in this e-mail
> message are solely those of the author and do not necessarily represent SDT
> A.S.`s formal and authorized views.
>

RE: "Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

Posted by Burcu Gul POLAT EGRI <be...@sdt.com.tr>.
Thank you, I have tried the first suggestion and the sample job executed successfully (last executed command is like below).

But I have another question. After executing the below command, a new task manager pod is created as expected but it is not removed automatically after the execution completed. Actually, for native kubernetes, I expect that the task manager pod should disappear after job completion.
Do you have any comment for this? Are there any other configuration for task manager pod removal?


./bin/flink run --target kubernetes-session -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=dproc-example-flink-cluster-id -Dkubernetes.namespace=sdt-dproc-flink-test -Dkubernetes.config.file=/home/devuser/.kube/config examples/batch/WordCount.jar

Best regards,
Burcu

From: Yang Wang [mailto:danrtsey.wy@gmail.com]
Sent: Saturday, March 26, 2022 7:48 AM
To: Burcu Gul POLAT EGRI <be...@sdt.com.tr>
Cc: user@flink.apache.org
Subject: Re: "Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

The root cause might be the LoadBalancer could not really work in your environment. We already have a ticket to track this[1] and will try to get it resolved in the next release.

For now, could you please have a try by adding "-Dkubernetes.rest-service.exposed.type=NodePort" to your session and submission commands?

Maybe you are also interested in the new flink-kubernetes-operator project[2]. It should make it easier to run a Flink application on the K8s.

[1]. https://issues.apache.org/jira/browse/FLINK-17231
[2]. https://github.com/apache/flink-kubernetes-operator

Best,
Yang

Burcu Gul POLAT EGRI <be...@sdt.com.tr>> 于2022年3月25日周五 21:39写道:

I am getting the following error when I try to execute sample at Flink documentation - Native Kubernetes<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/>.

I have succedded to execute the first command in documentation by adding some extra parameters with the help of this post<https://cloudolife.com/2020/12/12/Cloud-Native/BIg-Data/Flink/Deploy-a-Apache-Flink-session-cluster-natively-on-Kubernetes-K8S/>.

user@local:~/flink-1.14.4$<mailto:user@local:~/flink-1.14.4$> ./bin/kubernetes-session.sh \

-Dkubernetes.cluster-id=dproc-example-flink-cluster-id \

-Dtaskmanager.memory.process.size=4096m \

-Dkubernetes.taskmanager.cpu=2 \

-Dtaskmanager.numberOfTaskSlots=4 \

-Dresourcemanager.taskmanager-timeout=3600000 \

-Dkubernetes.namespace=sdt-dproc-flink-test \

-Dkubernetes.config.file=/home/devuser/.kube/config \

-Dkubernetes.jobmanager.service-account=flink-service-account

After executing above command, I have listed the new pod like below.

user@local:~/flink-1.14.4$<mailto:user@local:~/flink-1.14.4$> kubectl get pods

NAME                                             READY   STATUS    RESTARTS   AGE

dproc-example-flink-cluster-id-68c79bf67-mwh52   1/1     Running   0          1m

Then, I have executed the below command to submit example job.

user@local:~/flink-1.14.4$<mailto:user@local:~/flink-1.14.4$> ./bin/flink run --target kubernetes-session \

-Dkubernetes.service-account=flink-service-account \

-Dkubernetes.cluster-id=dproc-example-flink-cluster-id \

-Dkubernetes.namespace=sdt-dproc-flink-test \

-Dkubernetes.config.file=/home/devuser/.kube/config

examples/batch/WordCount.jar --input /home/user/sometexts.txt --output /tmp/flinksample

After a while, I received below logs:

2022-03-25 12:38:00,538 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster dproc-example-flink-cluster-id successfully, JobManager Web Interface: http://10.150.140.248:8081



------------------------------------------------------------

 The program finished with the following exception:



org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)

    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)

    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)

    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)

    at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:131)

    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)

    at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)

    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.base/java.lang.reflect.Method.invoke(Method.java:566)

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

    ... 8 more

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)

    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)

    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)

    ... 16 more

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:433)

    at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)

    at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:399)

    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)

    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:476)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)

    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:262)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)

    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)

    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

    at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.

    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:395)

    ... 21 more

Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081<http://10.150.140.248:8081>

    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)

    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)

    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)

    ... 19 more

Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081<http://10.150.140.248:8081>

    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)

    ... 8 more

I understand from the last part of this error that the JobManager Web Interface URL is wrong because when I check the Kubernetes services, port is different.

user@local:~/flink-1.14.4$<mailto:user@local:~/flink-1.14.4$> kubectl get svc

NAME                                  TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE

dproc-example-flink-cluster-id        ClusterIP      None            <none>        6123/TCP,6124/TCP   6h32m

dproc-example-flink-cluster-id-rest   LoadBalancer   10.97.100.197   <pending>     8081:30976/TCP      6h32m

The port should be 30976 rather that 8081. I have already tried to edit rest.port in flink-conf.yaml with this value and also as parameter from command line. But nothing changed. Always I get this error.

How can I force Flink client to access correct JobManager URL.

Burcu

Bu e-posta ve içeriği kişiye özel ve gizli bilgiler içerebilir. Eğer mesajın muhatabı veya muhataba iletmekle yükümlü yetkili temsilcisi siz değilseniz, bu mesajı çoğaltmak, dağıtmak, açıklamak dahil olmak üzere herhangi bir suretle kullanmamanız gerektiğini, aksine davranışınızın hukuka aykırılık teşkil edebileceğini bildiririz. Eğer bu mesajı yanlışlıkla aldıysanız, lütfen göndericiye e-posta ile bildirerek siliniz. Bu mesajda belirtilen şahsi görüşler göndericiye aittir ve SDT A.Ş.’nin resmi görüşünü temsil etmeyebilir.

This email and its contents may contain information that is privileged and confidential. If you are not an intended recipient,or the agent responsible for delivering this email to the intended recipient, you are hereby notified that any use, dissemination, distribution, or copying of this communication is strictly prohibited and may be unlawful. If you received this email in error, please notify the sender by replying to this email and delete the email sent in error. Personel opinions presented in this e-mail message are solely those of the author and do not necessarily represent SDT A.S.`s formal and authorized views.
Bu e-posta ve içeriği kişiye özel ve gizli bilgiler içerebilir. Eğer mesajın muhatabı veya muhataba iletmekle yükümlü yetkili temsilcisi siz değilseniz, bu mesajı çoğaltmak, dağıtmak, açıklamak dahil olmak üzere herhangi bir suretle kullanmamanız gerektiğini, aksine davranışınızın hukuka aykırılık teşkil edebileceğini bildiririz. Eğer bu mesajı yanlışlıkla aldıysanız, lütfen göndericiye e-posta ile bildirerek siliniz. Bu mesajda belirtilen şahsi görüşler göndericiye aittir ve SDT A.Ş.’nin resmi görüşünü temsil etmeyebilir.

This email and its contents may contain information that is privileged and confidential. If you are not an intended recipient,or the agent responsible for delivering this email to the intended recipient, you are hereby notified that any use, dissemination, distribution, or copying of this communication is strictly prohibited and may be unlawful. If you received this email in error, please notify the sender by replying to this email and delete the email sent in error. Personel opinions presented in this e-mail message are solely those of the author and do not necessarily represent SDT A.S.`s formal and authorized views.

Re: "Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

Posted by Yang Wang <da...@gmail.com>.
The root cause might be the LoadBalancer could not really work in your
environment. We already have a ticket to track this[1] and will try to get
it resolved in the next release.

For now, could you please have a try by adding
"-Dkubernetes.rest-service.exposed.type=NodePort" to your session and
submission commands?

Maybe you are also interested in the new flink-kubernetes-operator
project[2]. It should make it easier to run a Flink application on the K8s.

[1]. https://issues.apache.org/jira/browse/FLINK-17231
[2]. https://github.com/apache/flink-kubernetes-operator

Best,
Yang

Burcu Gul POLAT EGRI <be...@sdt.com.tr> 于2022年3月25日周五 21:39写道:

> I am getting the following error when I try to execute sample at Flink
> documentation - Native Kubernetes
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/>
> .
>
> I have succedded to execute the first command in documentation by adding
> some extra parameters with the help of this post
> <https://cloudolife.com/2020/12/12/Cloud-Native/BIg-Data/Flink/Deploy-a-Apache-Flink-session-cluster-natively-on-Kubernetes-K8S/>
> .
>
> user@local:~/flink-1.14.4$ ./bin/kubernetes-session.sh \
>
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id \
>
> -Dtaskmanager.memory.process.size=4096m \
>
> -Dkubernetes.taskmanager.cpu=2 \
>
> -Dtaskmanager.numberOfTaskSlots=4 \
>
> -Dresourcemanager.taskmanager-timeout=3600000 \
>
> -Dkubernetes.namespace=sdt-dproc-flink-test \
>
> -Dkubernetes.config.file=/home/devuser/.kube/config \
>
> -Dkubernetes.jobmanager.service-account=flink-service-account
>
> After executing above command, I have listed the new pod like below.
>
> user@local:~/flink-1.14.4$ kubectl get pods
>
> NAME                                             READY   STATUS    RESTARTS   AGE
>
> dproc-example-flink-cluster-id-68c79bf67-mwh52   1/1     Running   0          1m
>
> Then, I have executed the below command to submit example job.
>
> user@local:~/flink-1.14.4$ ./bin/flink run --target kubernetes-session \
>
> -Dkubernetes.service-account=flink-service-account \
>
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id \
>
> -Dkubernetes.namespace=sdt-dproc-flink-test \
>
> -Dkubernetes.config.file=/home/devuser/.kube/config
>
> examples/batch/WordCount.jar --input /home/user/sometexts.txt --output /tmp/flinksample
>
> After a while, I received below logs:
>
> 2022-03-25 12:38:00,538 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster dproc-example-flink-cluster-id successfully, JobManager Web Interface: http://10.150.140.248:8081
>
>
>
> ------------------------------------------------------------
>
>  The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
>     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>
>     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
>     at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>
>     at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>
>     at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>
>     at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>
>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>
>     at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
>
>     at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
>
>     at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:131)
>
>     at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>
>     at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)
>
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>
>     ... 8 more
>
> Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>
>     at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>
>     at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>
>     at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>
>     ... 16 more
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>
>     at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:433)
>
>     at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
>
>     at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
>
>     at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>
>     at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>
>     at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:399)
>
>     at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>
>     at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>
>     at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>
>     at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>
>     at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:476)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:262)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>
>     at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>
>     at java.base/java.lang.Thread.run(Thread.java:829)
>
> Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
>
>     at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:395)
>
>     ... 21 more
>
> Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081
>
>     at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>
>     at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>
>     at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
>
>     ... 19 more
>
> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: connection timed out: /10.150.140.248:8081
>
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
>
>     ... 8 more
>
> I understand from the last part of this error that the JobManager Web
> Interface URL is wrong because when I check the Kubernetes services, port
> is different.
>
> user@local:~/flink-1.14.4$ kubectl get svc
>
> NAME                                  TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
>
> dproc-example-flink-cluster-id        ClusterIP      None            <none>        6123/TCP,6124/TCP   6h32m
>
> dproc-example-flink-cluster-id-rest   LoadBalancer   10.97.100.197   <pending>     8081:30976/TCP      6h32m
>
> The port should be 30976 rather that 8081. I have already tried to edit
> rest.port in flink-conf.yaml with this value and also as parameter from
> command line. But nothing changed. Always I get this error.
>
> How can I force Flink client to access correct JobManager URL.
>
>
>
> *Burcu *
>
>
> Bu e-posta ve içeriği kişiye özel ve gizli bilgiler içerebilir. Eğer
> mesajın muhatabı veya muhataba iletmekle yükümlü yetkili temsilcisi siz
> değilseniz, bu mesajı çoğaltmak, dağıtmak, açıklamak dahil olmak üzere
> herhangi bir suretle kullanmamanız gerektiğini, aksine davranışınızın
> hukuka aykırılık teşkil edebileceğini bildiririz. Eğer bu mesajı
> yanlışlıkla aldıysanız, lütfen göndericiye e-posta ile bildirerek siliniz.
> Bu mesajda belirtilen şahsi görüşler göndericiye aittir ve SDT A.Ş.’nin
> resmi görüşünü temsil etmeyebilir.
>
> This email and its contents may contain information that is privileged and
> confidential. If you are not an intended recipient,or the agent responsible
> for delivering this email to the intended recipient, you are hereby
> notified that any use, dissemination, distribution, or copying of this
> communication is strictly prohibited and may be unlawful. If you received
> this email in error, please notify the sender by replying to this email and
> delete the email sent in error. Personel opinions presented in this e-mail
> message are solely those of the author and do not necessarily represent SDT
> A.S.`s formal and authorized views.
>