You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by yidan zhao <hi...@gmail.com> on 2023/01/12 10:48:03 UTC

Re: Unable to list jobs in flink cluster with multiple jobManagers

I think it is a bug: https://issues.apache.org/jira/browse/FLINK-25732

Yael Adsl <ya...@gmail.com> 于2022年12月12日周一 23:56写道:
>
> Hi,
>
> We are running a flink cluster (Flink version 1.14.3) on kubernetes with high-availablity.type: kubernetes. We have 3 jobmanagers. When we send jobs to the flink cluster, we run a "flink list --jobmanager flink-jobmanager:8081" command as part of the process".
>
> At first, we succeeded to run this command from within any of the jobmanager CLIs.
> But after the elected leader is deleted (For whatever reason. For example, server failure), the configmaps with the following format are updated with the new leader IP address:
> flink-cluster-ecb24e88d60bb06917da1c933785811a-jobmanager-leader
> flink-cluster-b4bef19e6481a6c42340e51b69e30923-jobmanager-leader
> ...
>
> But the following configmaps are not always updated with the same IP address as the others:
> flink-cluster-restserver-leader
> flink-cluster-resourcemanager-leader
> flink-cluster-dispatcher-leader
>
> Then, when we run the flink list command, we receive the error attached at the end of this mail.
> If we delete the jobmanager where the flink-cluster-restserver-leader configmap is pointing, the configmap gets modified to the IP address of the other configmaps, and the "flink list" command succeeds.
> Note: I can see in the log that the command attempts to connect to the IP which is set in the configmap: flink-cluster-restserver-leader'
>
> How do we fix this issue without needing any manual intervention?
>
> Thanks,
> Yael
>
> Error from CLI when running flink list command:
>
> root@flink-jobmanager-68b5fb748d-wwmvt:/opt/flink# flink list --jobmanager localhost:8081
> 2022-12-12 15:04:19,037 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for xi-env/flink-cluster-restserver-leader, watching id:fe7b3bff-1d4f-4e3e-bcf8-26afd74e4
> 12c
> Waiting for response...
> 2022-12-12 15:04:21,231 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-restserver-leader'}.
> 2022-12-12 15:04:21,233 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for xi-env/flink-cluster-restserver-leader, watching id:fe7b3bff-1d4f-4e3e-bcf8-26afd74e41
> 2c
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Failed to retrieve job list.
>         at org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:449)
>         at org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:430)
>         at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>         at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427)
>         at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
>         at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>         at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to serialize the result for RPC call : requestMultipleJobDetails.
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)
>         at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
>         at org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:858)
>         at org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.lambda$new$0(FutureUtils.java:876)
>         at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
>         at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
>         at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
>         at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
>         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
>         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
>         at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
>         at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
>         at akka.dispatch.OnComplete.internal(Future.scala:300)
>         at akka.dispatch.OnComplete.internal(Future.scala:297)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
>         at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
>         at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
>         at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
>         at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
>         at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>         at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
>         at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
>         at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
>         at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
>         at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
>         at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
>         at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
>         at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
>         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>         at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
>         at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
>         at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>         at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>         at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
>         at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>         at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>         at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
> Caused by: java.io.NotSerializableException: java.util.HashMap$Values
>         at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>         at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
>         at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
>         at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
>         at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>         at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>         at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)
>         ... 54 more
>
> End of exception on server side>]
>         at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:532)
>         at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:512)
>         at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
>         at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
>         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>         at java.base/java.lang.Thread.run(Unknown Source)
>