You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Cullen <ci...@gmail.com> on 2021/03/04 18:58:03 UTC

PyFlink Connection Refused to Kubernetes Session Cluster

Attempting to run the word_count.py example on my kubernetes (session)
cluster:

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=cmdaa \
-Dkubernetes.container.image=cmdaa/pyflink:0.0.1 \
--pyModule word_count \
--pyFiles /opt/flink-1.12.2/examples/python/table/batch/word_count.py

The following exception occurs:

2021-03-04 12:51:41,465 INFO
org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] -
Retrieve flink cluster cmdaa successfully, JobManager Web Interface:
http://spackler:8081
Traceback (most recent call last):
  File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py",
line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py",
line 85, in _run_code
    exec(code, run_globals)
  File "/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py",
line 80, in <module>
    word_count()
  File "/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py",
line 74, in word_count
    t_env.execute("word_count")
  File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 1276, in execute
  File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: java.lang.RuntimeException: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:352)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.execute(BatchTableEnvImpl.scala:317)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:349)
    ... 12 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    ... 1 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Number of retries has been
exhausted.
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
    ... 21 more
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: spackler/192.168.88.200:8081
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
    ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: spackler/192.168.88.200:8081
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748)

org.apache.flink.client.program.ProgramAbortException
    at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Do I need to create an ingress for pyflink?

Robert Cullen
240-475-4490

Re: PyFlink Connection Refused to Kubernetes Session Cluster

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Robert,

It seems the retrieved address of JobManager is a cluster-internal Ip that
can noly be accessed inside the cluster. As you said, you might need to
create an ingress to expose the JobManager service so that the client can
access to it outside of the k8s cluster.

Best,
Shuiqiang

Robert Cullen <ci...@gmail.com> 于2021年3月5日周五 上午2:58写道:

> Attempting to run the word_count.py example on my kubernetes (session)
> cluster:
>
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=cmdaa \
> -Dkubernetes.container.image=cmdaa/pyflink:0.0.1 \
> --pyModule word_count \
> --pyFiles /opt/flink-1.12.2/examples/python/table/batch/word_count.py
>
> The following exception occurs:
>
> 2021-03-04 12:51:41,465 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster cmdaa successfully, JobManager Web Interface: http://spackler:8081
> Traceback (most recent call last):
>   File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 193, in _run_module_as_main
>     "__main__", mod_spec)
>   File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 85, in _run_code
>     exec(code, run_globals)
>   File "/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py", line 80, in <module>
>     word_count()
>   File "/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py", line 74, in word_count
>     t_env.execute("word_count")
>   File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1276, in execute
>   File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
>   File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
>   File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>     at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
>     at org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:352)
>     at org.apache.flink.table.api.internal.BatchTableEnvImpl.execute(BatchTableEnvImpl.scala:317)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>     at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>     at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>     at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>     at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>     at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>     at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:349)
>     ... 12 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>     at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
>     at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
>     at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>     at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>     at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>     at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>     at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     ... 1 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
>     at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
>     ... 21 more
> Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: spackler/192.168.88.200:8081
>     at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>     at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>     at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
>     at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>     ... 19 more
> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: spackler/192.168.88.200:8081
> Caused by: java.net.ConnectException: Connection refused
>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
>     at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>     at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at java.lang.Thread.run(Thread.java:748)
>
> org.apache.flink.client.program.ProgramAbortException
>     at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>     at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>     at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>     at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>     at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Do I need to create an ingress for pyflink?
>
> Robert Cullen
> 240-475-4490
>