You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2021/03/01 10:45:27 UTC

Re: Flink’s Kubernetes HA services - NOT working

Hi Omer,

thanks for the logs. Could you tell us a bit more about the concrete setup
of your Flink K8s cluster? It looks to me as if the ResourceManager cannot
talk to the JobMaster which tries to register at the RM. Also some
JobMasters don't seem to reach the ResourceManager. Could it be that you
are running standby JobManager processes? If this is the case, then it does
not work that you are using a K8s service for the communication between
Flink components.

Cheers,
Till

On Sun, Feb 28, 2021 at 11:29 AM Omer Ozery <om...@gmail.com> wrote:

> Sorry for the late reply.
> I attached to this mail 3 types of logs taken from the jobmanager.
>
> 1. flink-jobmanager with log level info - when nothing is working the
> minute we try to deploy the jobs (even the UI is jobs overview is stuck)
> 3. flink-jobmanager with log level debug -  when nothing is working the
> minute we try to deploy the jobs, (even the UI is jobs overview is stuck)
> 2. flink-jobmanager with log level info with 1 successful job - you can
> see that it is started and dealing with leadership and checkpoints
> properly.
>
> you can see that everything works fine when the cluster is starting with
> no jobs
> all task managers are registered and communicating with the jobmanager
> with no problems.
>
> * BTY flink has this problem when some jobs are stuck in scheduling, the
> jobmanager running jobs UI overview is stuck also, and you can't see any
> jobs running even if there are some. it happened in earlier versions also,
> 1.9, 1.11...
>
> Thanks
> Omer
>
>
>
> On Thu, Feb 18, 2021 at 4:22 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Omer,
>>
>> could you share a bit more of the logs with us? I would be interested in
>> what has happened before "Stopping DefaultLeaderRetrievalService" is
>> logged. One problem you might run into is FLINK-20417. This problem should
>> be fixed with Flink 1.12.2.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20417
>>
>> Cheers,
>> Till
>>
>> On Thu, Feb 18, 2021 at 2:54 PM Omer Ozery <om...@gmail.com> wrote:
>>
>>> Hey guys
>>> It looks like the flink cluster is deployed successfully, it starts with
>>> no errors.
>>> but when we try to deploy the jobs, some jobs are starting and some
>>> can't find available slots for some reason, even when we have free ones.
>>> happens with different jobs every time..
>>> below are the exceptions thrown by the components.
>>> and I also attached an image showing the taskamangers and the free slots.
>>>
>>> *jobManager throws this error:*
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>> - Stopping DefaultLeaderRetrievalService.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver
>>> [] - Stopping
>>> KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-1796779318657734fcbc261f8d01d250-jobmanager-leader'}.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
>>> [] - The watcher is closing.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>> 2021-02-17 11:19:41,956 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster                 [] -
>>> Registration at ResourceManager was declined: java.lang.Exception: Job
>>> leader id service has been stopped.
>>>
>>>
>>> java.util.concurrent.CompletionException:
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Slot request bulk is not fulfillable! Could not allocate the required slot
>>> within slot request timeout
>>> at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>> ~[?:?]
>>> at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>> ~[?:?]
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>> ~[?:?]
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>> ~[?:?]
>>> at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>>> ~[?:?]
>>> at
>>> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> ~[?:?]
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>> Caused by:
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Slot request bulk is not fulfillable! Could not allocate the required slot
>>> within slot request timeout
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> ... 24 more
>>> Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
>>> 300000 ms
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> ... 24 more
>>> 2021-02-17 11:18:18,977 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>>> Discarding the results produced by task execution
>>> 317da8a62cf57f74cf587e6bd733f5e7.
>>> 2021-02-17 11:18:18,977 INFO
>>>  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>>> [] - Calculating tasks to restart to recover the failed task
>>> e1ff02ab7657079c9d2254f12c031b2a_0.
>>> 2021-02-17 11:18:18,977 INFO
>>>  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>>> [] - 13 tasks should be restarted to recover the failed task
>>> e1ff02ab7657079c9d2254f12c031b2a_0.
>>>
>>> *The jobs throws this error*
>>> 2021-02-17 14:13:48
>>> java.util.concurrent.CompletionException:
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Slot request bulk is not fulfillable! Could not allocate the required slot
>>> within slot request timeout
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>> at
>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>> at
>>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>>> at
>>> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
>>> at
>>> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
>>> at
>>> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
>>> at
>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(Unknown Source)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by:
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Slot request bulk is not fulfillable! Could not allocate the required slot
>>> within slot request timeout
>>> at
>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
>>> ... 24 more
>>> Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
>>> 300000 ms
>>> ... 25 more
>>>
>>> Any suggestions ?
>>>
>>> Thanks
>>> Omer
>>>
>>> On Tue, Feb 16, 2021 at 6:54 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> If you are running a session cluster, then Flink will create a config
>>>> map for every submitted job. These config maps will unfortunately only be
>>>> cleaned up when you shut down the cluster. This is a known limitation which
>>>> we want to fix soon [1, 2].
>>>>
>>>> If you can help us with updating the documentation properly (e.g. which
>>>> role binding to use for the service account with minimal permissions), then
>>>> we would highly appreciate your help.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-20695
>>>> [2] https://issues.apache.org/jira/browse/FLINK-21008
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <om...@gmail.com> wrote:
>>>>
>>>>> Hey guys,
>>>>> You are right, the documentation lacks this part, and the flink needs
>>>>> it to start.
>>>>> I'm not sure if it's 100% solved our problem because it creates
>>>>> endless copies of the configmaps with random ids and also our jobs can't
>>>>> schedule for some reason.
>>>>> I will investigate this further with Daniel and let you know.
>>>>> Also the access control given using this document is vast, imprecise
>>>>> and clusterwide (it uses a default edit-all clusterRole), so when you
>>>>> create a PR, make sure that whoever is in charge of  the flink-k8s
>>>>> integration, document the accurate permissions to create and attach to the
>>>>> flink's components.
>>>>>
>>>>> Thanks very much for your help!
>>>>> we will keep you updated.
>>>>> Omer
>>>>>
>>>>> On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Omar,
>>>>>>
>>>>>> I think Matthias is right. The K8s HA services create and edit config
>>>>>> maps. Hence they need the rights to do this. In the native K8s
>>>>>> documentation there is a section about how to create a service account with
>>>>>> the right permissions [1].
>>>>>>
>>>>>> I think that our K8s HA documentation currently lacks this part. I
>>>>>> will create a PR to update the documentation.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <ma...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm adding the Flink user ML to the conversation again.
>>>>>>>
>>>>>>> On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <
>>>>>>> matthias@ververica.com> wrote:
>>>>>>>
>>>>>>>> Hi Omer,
>>>>>>>> thanks for sharing the configuration. You're right: Using NFS for
>>>>>>>> HA's storageDir is fine.
>>>>>>>>
>>>>>>>> About the error message you're referring to: I haven't worked with
>>>>>>>> the HA k8s service, yet. But the RBAC is a good hint. Flink's native
>>>>>>>> Kubernetes documentation [1] points out that you can use a custom service
>>>>>>>> account. This one needs special permissions to start/stop pods
>>>>>>>> automatically (which does not apply in your case) but also to access
>>>>>>>> ConfigMaps. You might want to try setting the permission as described in
>>>>>>>> [1].
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Matthias
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>>>>>>>>
>>>>>>>> On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <om...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey Matthias.
>>>>>>>>> My name is Omer, i am Daniel's devops, i will elaborate about our
>>>>>>>>> flink situation.
>>>>>>>>> these our flink resource definitions, as they are generated using
>>>>>>>>> the helm template command (minus log4j,metrics configuration and some
>>>>>>>>> sensitive data)
>>>>>>>>> ---
>>>>>>>>> # Source: flink/templates/flink-configmap.yaml
>>>>>>>>> apiVersion: v1
>>>>>>>>> kind: ConfigMap
>>>>>>>>> metadata:
>>>>>>>>>   name: flink-config
>>>>>>>>>   labels:
>>>>>>>>>     app: flink
>>>>>>>>> data:
>>>>>>>>>   flink-conf.yaml: |
>>>>>>>>>     jobmanager.rpc.address: flink-jobmanager
>>>>>>>>>     jobmanager.rpc.port: 6123
>>>>>>>>>     jobmanager.execution.failover-strategy: region
>>>>>>>>>     jobmanager.memory.process.size: 8g
>>>>>>>>>     taskmanager.memory.process.size: 24g
>>>>>>>>>     taskmanager.memory.task.off-heap.size: 1g
>>>>>>>>>     taskmanager.numberOfTaskSlots: 4
>>>>>>>>>     queryable-state.proxy.ports: 6125
>>>>>>>>>     queryable-state.enable: true
>>>>>>>>>     blob.server.port: 6124
>>>>>>>>>     parallelism.default: 1
>>>>>>>>>     state.backend.incremental: true
>>>>>>>>>     state.backend: rocksdb
>>>>>>>>>     state.backend.rocksdb.localdir: /opt/flink/rocksdb
>>>>>>>>>     state.checkpoints.dir: file:///opt/flink/checkpoints
>>>>>>>>>     classloader.resolve-order: child-first
>>>>>>>>>     kubernetes.cluster-id: flink-cluster
>>>>>>>>>     kubernetes.namespace: intel360-beta
>>>>>>>>>     high-availability:
>>>>>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>>>>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>>>>>>>
>>>>>>>>> ---
>>>>>>>>> # Source: flink/templates/flink-service.yaml
>>>>>>>>> apiVersion: v1
>>>>>>>>> kind: Service
>>>>>>>>> metadata:
>>>>>>>>>   name: flink-jobmanager
>>>>>>>>>   labels:
>>>>>>>>>     {}
>>>>>>>>> spec:
>>>>>>>>>   ports:
>>>>>>>>>   - name: http-ui
>>>>>>>>>     port: 8081
>>>>>>>>>     targetPort: http-ui
>>>>>>>>>   - name: tcp-rpc
>>>>>>>>>     port: 6123
>>>>>>>>>     targetPort: tcp-rpc
>>>>>>>>>   - name: tcp-blob
>>>>>>>>>     port: 6124
>>>>>>>>>     targetPort: tcp-blob
>>>>>>>>>   selector:
>>>>>>>>>     app: flink
>>>>>>>>>     component: jobmanager
>>>>>>>>> ---
>>>>>>>>> # Source: flink/templates/flink-deployment.yaml
>>>>>>>>> apiVersion: apps/v1
>>>>>>>>> kind: Deployment
>>>>>>>>> metadata:
>>>>>>>>>   name: flink-jobmanager
>>>>>>>>> spec:
>>>>>>>>>   replicas: 1
>>>>>>>>>   selector:
>>>>>>>>>     matchLabels:
>>>>>>>>>       app: flink
>>>>>>>>>       component: jobmanager
>>>>>>>>>   template:
>>>>>>>>>     metadata:
>>>>>>>>>       labels:
>>>>>>>>>         app: flink
>>>>>>>>>         component: jobmanager
>>>>>>>>>       annotations:
>>>>>>>>>         checksum/config:
>>>>>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>>>>>>>     spec:
>>>>>>>>>       containers:
>>>>>>>>>       - name: jobmanager
>>>>>>>>>         image: flink:1.12.1-scala_2.11-java11
>>>>>>>>>         args: [ "jobmanager" ]
>>>>>>>>>         ports:
>>>>>>>>>         - name: http-ui
>>>>>>>>>           containerPort: 8081
>>>>>>>>>         - name: tcp-rpc
>>>>>>>>>           containerPort: 6123
>>>>>>>>>         - name: tcp-blob
>>>>>>>>>           containerPort: 6124
>>>>>>>>>         resources:
>>>>>>>>>           {}
>>>>>>>>>         # Environment Variables
>>>>>>>>>         env:
>>>>>>>>>         - name: ENABLE_CHECKPOINTING
>>>>>>>>>           value: "true"
>>>>>>>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>>>>>>>           value: "flink-jobmanager"
>>>>>>>>>         volumeMounts:
>>>>>>>>>         - name: flink-config
>>>>>>>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>>>>>>>           subPath: flink-conf.yaml
>>>>>>>>>         # NFS mounts
>>>>>>>>>         - name: flink-checkpoints
>>>>>>>>>           mountPath: "/opt/flink/checkpoints"
>>>>>>>>>         - name: flink-recovery
>>>>>>>>>           mountPath: "/opt/flink/recovery"
>>>>>>>>>       volumes:
>>>>>>>>>       - name: flink-config
>>>>>>>>>         configMap:
>>>>>>>>>           name: flink-config
>>>>>>>>>       # NFS volumes
>>>>>>>>>       - name: flink-checkpoints
>>>>>>>>>         nfs:
>>>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>>>>>>>       - name: flink-recovery
>>>>>>>>>         nfs:
>>>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>>>>>>>> ---
>>>>>>>>> # Source: flink/templates/flink-deployment.yaml
>>>>>>>>> apiVersion: apps/v1
>>>>>>>>> kind: Deployment
>>>>>>>>> metadata:
>>>>>>>>>   name: flink-taskmanager
>>>>>>>>> spec:
>>>>>>>>>   replicas: 7
>>>>>>>>>   selector:
>>>>>>>>>     matchLabels:
>>>>>>>>>       app: flink
>>>>>>>>>       component: taskmanager
>>>>>>>>>   template:
>>>>>>>>>     metadata:
>>>>>>>>>       labels:
>>>>>>>>>         app: flink
>>>>>>>>>         component: taskmanager
>>>>>>>>>       annotations:
>>>>>>>>>         checksum/config:
>>>>>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>>>>>>>     spec:
>>>>>>>>>       containers:
>>>>>>>>>       - name: taskmanager
>>>>>>>>>         image: flink:1.12.1-scala_2.11-java11
>>>>>>>>>         args: [ "taskmanager" ]
>>>>>>>>>         resources:
>>>>>>>>>           limits:
>>>>>>>>>             cpu: 6000m
>>>>>>>>>             memory: 24Gi
>>>>>>>>>           requests:
>>>>>>>>>             cpu: 6000m
>>>>>>>>>             memory: 24Gi
>>>>>>>>>         # Environment Variables
>>>>>>>>>         env:
>>>>>>>>>         - name: ENABLE_CHECKPOINTING
>>>>>>>>>           value: "true"
>>>>>>>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>>>>>>>           value: "flink-jobmanager"
>>>>>>>>>         volumeMounts:
>>>>>>>>>         - name: flink-config
>>>>>>>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>>>>>>>           subPath: flink-conf.yaml
>>>>>>>>>         # NFS mounts
>>>>>>>>>         - name: flink-checkpoints
>>>>>>>>>           mountPath: "/opt/flink/checkpoints"
>>>>>>>>>         - name: flink-recovery
>>>>>>>>>           mountPath: "/opt/flink/recovery"
>>>>>>>>>       volumes:
>>>>>>>>>       - name: flink-config
>>>>>>>>>         configMap:
>>>>>>>>>           name: flink-config
>>>>>>>>>       # NFS volumes
>>>>>>>>>       - name: flink-checkpoints
>>>>>>>>>         nfs:
>>>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>>>>>>>       - name: flink-recovery
>>>>>>>>>         nfs:
>>>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>>>>>>>> ---
>>>>>>>>> # Source: flink/templates/flink-ingress.yaml
>>>>>>>>> apiVersion: extensions/v1beta1
>>>>>>>>> kind: Ingress
>>>>>>>>> metadata:
>>>>>>>>>   name: jobmanager
>>>>>>>>> spec:
>>>>>>>>>   rules:
>>>>>>>>>     - host: my.flink.job.manager.url
>>>>>>>>>       http:
>>>>>>>>>         paths:
>>>>>>>>>           - path: /
>>>>>>>>>             backend:
>>>>>>>>>               serviceName: flink-jobmanager
>>>>>>>>>               servicePort: 8081
>>>>>>>>> ---
>>>>>>>>>
>>>>>>>>> as you can see we are using the skeleton of the standalone
>>>>>>>>> configuration as it documented here:
>>>>>>>>>
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>>>>>>>>> with some per-company configuration obviously, but still under the
>>>>>>>>> scope of this document..
>>>>>>>>>
>>>>>>>>> on a normal beautiful day and without the HA configuration,
>>>>>>>>> everything works fine.
>>>>>>>>> when trying to configure kubernetes HA using this document:
>>>>>>>>>
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>>>> with the following parameters:
>>>>>>>>>     kubernetes.cluster-id: flink-cluster
>>>>>>>>>     high-availability:
>>>>>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>>>>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>>>>>>>
>>>>>>>>> the jobmanager fails with the following error:
>>>>>>>>> 2021-02-14 16:57:19,103 ERROR
>>>>>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>>>>>>>> Exception occurred while acquiring lock 'ConfigMapLock: default -
>>>>>>>>> flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
>>>>>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>>>>>>>> executing: GET at:
>>>>>>>>> https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader.
>>>>>>>>> Message: Forbidden!Configured service account doesn't have access. Service
>>>>>>>>> account may have been revoked. configmaps "flink-cluster-restserver-leader"
>>>>>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get
>>>>>>>>> resource "configmaps" in API group "" in the namespace "default".
>>>>>>>>>
>>>>>>>>> so we added this line as well (as you can see in the flink-config
>>>>>>>>> configmap above)
>>>>>>>>> kubernetes.namespace: intel360-beta
>>>>>>>>> although it is not part of the document and i don't think flink
>>>>>>>>> should be aware of the namespace it resides in, it damages the modularity
>>>>>>>>> of upper layers of configurations, regardless we added it and then got the
>>>>>>>>> the following error:
>>>>>>>>>
>>>>>>>>> 2021-02-14 17:00:57,086 ERROR
>>>>>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>>>>>>>> Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta -
>>>>>>>>> flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
>>>>>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>>>>>>>> executing: GET at:
>>>>>>>>> https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader.
>>>>>>>>> Message: Forbidden!Configured service account doesn't have access. Service
>>>>>>>>> account may have been revoked. configmaps "flink-cluster-restserver-leader"
>>>>>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get
>>>>>>>>> resource "configmaps" in API group "" in the namespace "intel360-beta".
>>>>>>>>>
>>>>>>>>> which is bassically the same error message just directed to the
>>>>>>>>> flink's namespace.
>>>>>>>>> my question is, do i need to add RBAC to the flink's service
>>>>>>>>> account, because i got the impression from the flink official documents and
>>>>>>>>> some blogs responses that it designed to function without any special
>>>>>>>>> permissions.
>>>>>>>>> if we do need RBAC can you give an official documentations
>>>>>>>>> reference of the exact permissions.
>>>>>>>>>
>>>>>>>>> NOTE: as you can see our flink-checkpoints and recovery locations
>>>>>>>>> are directed to a local directory mounted to a shared NFS between all tasks
>>>>>>>>> and job manager, since our infrastructure is bare-metal by design.
>>>>>>>>> (although this one is hosted in AWS)
>>>>>>>>>
>>>>>>>>> thanks in advance
>>>>>>>>> Omer
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------- Forwarded message ---------
>>>>>>>>> From: Daniel Peled <da...@gmail.com>
>>>>>>>>> Date: Sun, Feb 14, 2021 at 6:18 PM
>>>>>>>>> Subject: Fwd: Flink’s Kubernetes HA services - NOT working
>>>>>>>>> To: <om...@gmail.com>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------- Forwarded message ---------
>>>>>>>>> מאת: Matthias Pohl <ma...@ververica.com>
>>>>>>>>> ‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
>>>>>>>>> Subject: Re: Flink’s Kubernetes HA services - NOT working
>>>>>>>>> To: Matthias Pohl <ma...@ververica.com>
>>>>>>>>> Cc: Daniel Peled <da...@gmail.com>, user <
>>>>>>>>> user@flink.apache.org>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> One other thing: It looks like you've set
>>>>>>>>> high-availability.storageDir to a local path file:///opt/flink/recovery.
>>>>>>>>> You should use a storage path that is accessible from all Flink cluster
>>>>>>>>> components (e.g. using S3). Only references are stored in Kubernetes
>>>>>>>>> ConfigMaps [1].
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Matthias
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#configuration
>>>>>>>>>
>>>>>>>>> On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <
>>>>>>>>> matthias@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Daniel,
>>>>>>>>>> what's the exact configuration you used? Did you use the resource
>>>>>>>>>> definitions provided in the Standalone Flink on Kubernetes docs [1]? Did
>>>>>>>>>> you do certain things differently in comparison to the documentation?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Matthias
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#appendix
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <
>>>>>>>>>> daniel.peled.work@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ,Hey
>>>>>>>>>>>
>>>>>>>>>>> We are using standalone flink on kubernetes
>>>>>>>>>>> :"And we have followed the instructions in the following link
>>>>>>>>>>> "Kubernetes HA Services
>>>>>>>>>>>
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>>>>>> .We were unable to make it work
>>>>>>>>>>> .We are facing a lot of problems
>>>>>>>>>>> For example some of the jobs don't start complaining that there
>>>>>>>>>>> are not enough slots available - although there are enough slots  and it
>>>>>>>>>>> seems as the job manager is NOT aware of all the task managers
>>>>>>>>>>> .In other scenario we were unable to run any job at all
>>>>>>>>>>>  The flink dashboard is unresponsive and we get the error
>>>>>>>>>>> "flink service temporarily unavailable due to an ongoing leader
>>>>>>>>>>> election. please refresh"
>>>>>>>>>>> .We believe we are missing some configurations
>>>>>>>>>>>  ?Are there any more detailed instructions
>>>>>>>>>>> ?And suggestions/tips
>>>>>>>>>>>  .Attached is the log of the job manager in one of the attempts
>>>>>>>>>>>
>>>>>>>>>>> Please give me some advice.
>>>>>>>>>>> BR,
>>>>>>>>>>> Danny
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>

Re: Flink’s Kubernetes HA services - NOT working

Posted by Till Rohrmann <tr...@apache.org>.
Hmm, this is strange. From the logs it looks as if certain communications
between components don't arrive at the receiver's end. I think we have to
further dig into the problem.

In order to further narrow it down, could you try to start the cluster with
using pod IPs instead of K8s services for inter component communications?
You can see here how to configure it [1]. That way we make sure that it is
not a problem of the K8s service.

[1] https://stackoverflow.com/a/66228073/4815083

Cheers,
Till

On Mon, Mar 1, 2021, 21:42 Omer Ozery <om...@gmail.com> wrote:

> Hey Till
> these our flink resource definitions, as they are generated using the helm
> template command (minus log4j,metrics configuration and some sensitive data)
> ---
> # Source: flink/templates/flink-configmap.yaml
> apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: flink-config
>   labels:
>     app: flink
> data:
>   flink-conf.yaml: |
>     jobmanager.rpc.address: flink-jobmanager
>     jobmanager.rpc.port: 6123
>     jobmanager.execution.failover-strategy: region
>     jobmanager.memory.process.size: 8g
>     taskmanager.memory.process.size: 24g
>     taskmanager.memory.task.off-heap.size: 1g
>     taskmanager.numberOfTaskSlots: 4
>     queryable-state.proxy.ports: 6125
>     queryable-state.enable: true
>     blob.server.port: 6124
>     parallelism.default: 1
>     state.backend.incremental: true
>     state.backend: rocksdb
>     state.backend.rocksdb.localdir: /opt/flink/rocksdb
>     state.checkpoints.dir: file:///opt/flink/checkpoints
>     classloader.resolve-order: child-first
>     kubernetes.cluster-id: flink-cluster
>     kubernetes.namespace: intel360-beta
>     high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>     high-availability.storageDir: file:///opt/flink/recovery
>
> ---
> # Source: flink/templates/flink-service.yaml
> apiVersion: v1
> kind: Service
> metadata:
>   name: flink-jobmanager
>   labels:
>     {}
> spec:
>   ports:
>   - name: http-ui
>     port: 8081
>     targetPort: http-ui
>   - name: tcp-rpc
>     port: 6123
>     targetPort: tcp-rpc
>   - name: tcp-blob
>     port: 6124
>     targetPort: tcp-blob
>   selector:
>     app: flink
>     component: jobmanager
> ---
> # Source: flink/templates/flink-deployment.yaml
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-jobmanager
> spec:
>   replicas: 1
>   selector:
>     matchLabels:
>       app: flink
>       component: jobmanager
>   template:
>     metadata:
>       labels:
>         app: flink
>         component: jobmanager
>       annotations:
>         checksum/config:
> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>     spec:
>       containers:
>       - name: jobmanager
>         image: flink:1.12.1-scala_2.11-java11
>         args: [ "jobmanager" ]
>         ports:
>         - name: http-ui
>           containerPort: 8081
>         - name: tcp-rpc
>           containerPort: 6123
>         - name: tcp-blob
>           containerPort: 6124
>         resources:
>           {}
>         # Environment Variables
>         env:
>         - name: ENABLE_CHECKPOINTING
>           value: "true"
>         - name: JOB_MANAGER_RPC_ADDRESS
>           value: "flink-jobmanager"
>         volumeMounts:
>         - name: flink-config
>           mountPath: /opt/flink/conf/flink-conf.yaml
>           subPath: flink-conf.yaml
>         # NFS mounts
>         - name: flink-checkpoints
>           mountPath: "/opt/flink/checkpoints"
>         - name: flink-recovery
>           mountPath: "/opt/flink/recovery"
>       volumes:
>       - name: flink-config
>         configMap:
>           name: flink-config
>       # NFS volumes
>       - name: flink-checkpoints
>         nfs:
>           server: "my-nfs-server.my-org"
>           path: "/my-shared-nfs-dir/flink/checkpoints"
>       - name: flink-recovery
>         nfs:
>           server: "my-nfs-server.my-org"
>           path: "/my-shared-nfs-dir/flink/recovery"
> ---
> # Source: flink/templates/flink-deployment.yaml
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-taskmanager
> spec:
>   replicas: 7
>   selector:
>     matchLabels:
>       app: flink
>       component: taskmanager
>   template:
>     metadata:
>       labels:
>         app: flink
>         component: taskmanager
>       annotations:
>         checksum/config:
> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>     spec:
>       containers:
>       - name: taskmanager
>         image: flink:1.12.1-scala_2.11-java11
>         args: [ "taskmanager" ]
>         resources:
>           limits:
>             cpu: 6000m
>             memory: 24Gi
>           requests:
>             cpu: 6000m
>             memory: 24Gi
>         # Environment Variables
>         env:
>         - name: ENABLE_CHECKPOINTING
>           value: "true"
>         - name: JOB_MANAGER_RPC_ADDRESS
>           value: "flink-jobmanager"
>         volumeMounts:
>         - name: flink-config
>           mountPath: /opt/flink/conf/flink-conf.yaml
>           subPath: flink-conf.yaml
>         # NFS mounts
>         - name: flink-checkpoints
>           mountPath: "/opt/flink/checkpoints"
>         - name: flink-recovery
>           mountPath: "/opt/flink/recovery"
>       volumes:
>       - name: flink-config
>         configMap:
>           name: flink-config
>       # NFS volumes
>       - name: flink-checkpoints
>         nfs:
>           server: "my-nfs-server.my-org"
>           path: "/my-shared-nfs-dir/flink/checkpoints"
>       - name: flink-recovery
>         nfs:
>           server: "my-nfs-server.my-org"
>           path: "/my-shared-nfs-dir/flink/recovery"
> ---
> # Source: flink/templates/flink-ingress.yaml
> apiVersion: extensions/v1beta1
> kind: Ingress
> metadata:
>   name: jobmanager
> spec:
>   rules:
>     - host: my.flink.job.manager.url
>       http:
>         paths:
>           - path: /
>             backend:
>               serviceName: flink-jobmanager
>               servicePort: 8081
> ---
>
> as you can see we are using the skeleton of the standalone configuration
> as it documented here:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
> with some per-company configuration obviously, but still under the scope
> of this document..
> And there is no standby jobmanager..
>
> ✌️
>
> On Mon, Mar 1, 2021, 12:45 PM Till Rohrmann <tr...@apache.org> wrote:
>
>> Hi Omer,
>>
>> thanks for the logs. Could you tell us a bit more about the concrete
>> setup of your Flink K8s cluster? It looks to me as if the ResourceManager
>> cannot talk to the JobMaster which tries to register at the RM. Also some
>> JobMasters don't seem to reach the ResourceManager. Could it be that you
>> are running standby JobManager processes? If this is the case, then it does
>> not work that you are using a K8s service for the communication between
>> Flink components.
>>
>> Cheers,
>> Till
>>
>> On Sun, Feb 28, 2021 at 11:29 AM Omer Ozery <om...@gmail.com> wrote:
>>
>>> Sorry for the late reply.
>>> I attached to this mail 3 types of logs taken from the jobmanager.
>>>
>>> 1. flink-jobmanager with log level info - when nothing is working the
>>> minute we try to deploy the jobs (even the UI is jobs overview is stuck)
>>> 3. flink-jobmanager with log level debug -  when nothing is working the
>>> minute we try to deploy the jobs, (even the UI is jobs overview is stuck)
>>> 2. flink-jobmanager with log level info with 1 successful job - you can
>>> see that it is started and dealing with leadership and checkpoints
>>> properly.
>>>
>>> you can see that everything works fine when the cluster is starting with
>>> no jobs
>>> all task managers are registered and communicating with the jobmanager
>>> with no problems.
>>>
>>> * BTY flink has this problem when some jobs are stuck in scheduling, the
>>> jobmanager running jobs UI overview is stuck also, and you can't see any
>>> jobs running even if there are some. it happened in earlier versions also,
>>> 1.9, 1.11...
>>>
>>> Thanks
>>> Omer
>>>
>>>
>>>
>>> On Thu, Feb 18, 2021 at 4:22 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Omer,
>>>>
>>>> could you share a bit more of the logs with us? I would be interested
>>>> in what has happened before "Stopping DefaultLeaderRetrievalService" is
>>>> logged. One problem you might run into is FLINK-20417. This problem should
>>>> be fixed with Flink 1.12.2.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-20417
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Feb 18, 2021 at 2:54 PM Omer Ozery <om...@gmail.com> wrote:
>>>>
>>>>> Hey guys
>>>>> It looks like the flink cluster is deployed successfully, it starts
>>>>> with no errors.
>>>>> but when we try to deploy the jobs, some jobs are starting and some
>>>>> can't find available slots for some reason, even when we have free ones.
>>>>> happens with different jobs every time..
>>>>> below are the exceptions thrown by the components.
>>>>> and I also attached an image showing the taskamangers and the free
>>>>> slots.
>>>>>
>>>>> *jobManager throws this error:*
>>>>> 2021-02-17 11:19:41,956 INFO
>>>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>>>> - Stopping DefaultLeaderRetrievalService.
>>>>> 2021-02-17 11:19:41,956 INFO
>>>>>  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver
>>>>> [] - Stopping
>>>>> KubernetesLeaderRetrievalDriver{configMapName='flink-cluster-1796779318657734fcbc261f8d01d250-jobmanager-leader'}.
>>>>> 2021-02-17 11:19:41,956 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
>>>>> [] - The watcher is closing.
>>>>> 2021-02-17 11:19:41,956 INFO
>>>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>>>> 2021-02-17 11:19:41,956 INFO
>>>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>>>> 2021-02-17 11:19:41,956 INFO
>>>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>>>> 2021-02-17 11:19:41,956 INFO
>>>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
>>>>> Registration of job manager bec569547a4ab5be4e2068a28164415a
>>>>> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_19 failed.
>>>>> 2021-02-17 11:19:41,956 INFO
>>>>>  org.apache.flink.runtime.jobmaster.JobMaster                 [] -
>>>>> Registration at ResourceManager was declined: java.lang.Exception: Job
>>>>> leader id service has been stopped.
>>>>>
>>>>>
>>>>> java.util.concurrent.CompletionException:
>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>> Slot request bulk is not fulfillable! Could not allocate the required slot
>>>>> within slot request timeout
>>>>> at
>>>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>>>> ~[?:?]
>>>>> at
>>>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>>>> ~[?:?]
>>>>> at
>>>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>>>> ~[?:?]
>>>>> at
>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>>>> ~[?:?]
>>>>> at
>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>>>>> ~[?:?]
>>>>> at
>>>>> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>>> ~[?:?]
>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> [flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> Caused by:
>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>> Slot request bulk is not fulfillable! Could not allocate the required slot
>>>>> within slot request timeout
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> ... 24 more
>>>>> Caused by: java.util.concurrent.TimeoutException: Timeout has
>>>>> occurred: 300000 ms
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
>>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>>> ... 24 more
>>>>> 2021-02-17 11:18:18,977 INFO
>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>>>>> Discarding the results produced by task execution
>>>>> 317da8a62cf57f74cf587e6bd733f5e7.
>>>>> 2021-02-17 11:18:18,977 INFO
>>>>>  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>>>>> [] - Calculating tasks to restart to recover the failed task
>>>>> e1ff02ab7657079c9d2254f12c031b2a_0.
>>>>> 2021-02-17 11:18:18,977 INFO
>>>>>  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>>>>> [] - 13 tasks should be restarted to recover the failed task
>>>>> e1ff02ab7657079c9d2254f12c031b2a_0.
>>>>>
>>>>> *The jobs throws this error*
>>>>> 2021-02-17 14:13:48
>>>>> java.util.concurrent.CompletionException:
>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>> Slot request bulk is not fulfillable! Could not allocate the required slot
>>>>> within slot request timeout
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>>>>> at
>>>>> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>>>>> at
>>>>> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
>>>>> at
>>>>> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
>>>>> at
>>>>> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
>>>>> at
>>>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(Unknown Source)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by:
>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>>> Slot request bulk is not fulfillable! Could not allocate the required slot
>>>>> within slot request timeout
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
>>>>> ... 24 more
>>>>> Caused by: java.util.concurrent.TimeoutException: Timeout has
>>>>> occurred: 300000 ms
>>>>> ... 25 more
>>>>>
>>>>> Any suggestions ?
>>>>>
>>>>> Thanks
>>>>> Omer
>>>>>
>>>>> On Tue, Feb 16, 2021 at 6:54 PM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> If you are running a session cluster, then Flink will create a config
>>>>>> map for every submitted job. These config maps will unfortunately only be
>>>>>> cleaned up when you shut down the cluster. This is a known limitation which
>>>>>> we want to fix soon [1, 2].
>>>>>>
>>>>>> If you can help us with updating the documentation properly (e.g.
>>>>>> which role binding to use for the service account with minimal
>>>>>> permissions), then we would highly appreciate your help.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-20695
>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-21008
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <om...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey guys,
>>>>>>> You are right, the documentation lacks this part, and the flink
>>>>>>> needs it to start.
>>>>>>> I'm not sure if it's 100% solved our problem because it creates
>>>>>>> endless copies of the configmaps with random ids and also our jobs can't
>>>>>>> schedule for some reason.
>>>>>>> I will investigate this further with Daniel and let you know.
>>>>>>> Also the access control given using this document is vast, imprecise
>>>>>>> and clusterwide (it uses a default edit-all clusterRole), so when you
>>>>>>> create a PR, make sure that whoever is in charge of  the flink-k8s
>>>>>>> integration, document the accurate permissions to create and attach to the
>>>>>>> flink's components.
>>>>>>>
>>>>>>> Thanks very much for your help!
>>>>>>> we will keep you updated.
>>>>>>> Omer
>>>>>>>
>>>>>>> On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <tr...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Omar,
>>>>>>>>
>>>>>>>> I think Matthias is right. The K8s HA services create and edit
>>>>>>>> config maps. Hence they need the rights to do this. In the native K8s
>>>>>>>> documentation there is a section about how to create a service account with
>>>>>>>> the right permissions [1].
>>>>>>>>
>>>>>>>> I think that our K8s HA documentation currently lacks this part. I
>>>>>>>> will create a PR to update the documentation.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <
>>>>>>>> matthias@ververica.com> wrote:
>>>>>>>>
>>>>>>>>> I'm adding the Flink user ML to the conversation again.
>>>>>>>>>
>>>>>>>>> On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <
>>>>>>>>> matthias@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Omer,
>>>>>>>>>> thanks for sharing the configuration. You're right: Using NFS for
>>>>>>>>>> HA's storageDir is fine.
>>>>>>>>>>
>>>>>>>>>> About the error message you're referring to: I haven't worked
>>>>>>>>>> with the HA k8s service, yet. But the RBAC is a good hint. Flink's native
>>>>>>>>>> Kubernetes documentation [1] points out that you can use a custom service
>>>>>>>>>> account. This one needs special permissions to start/stop pods
>>>>>>>>>> automatically (which does not apply in your case) but also to access
>>>>>>>>>> ConfigMaps. You might want to try setting the permission as described in
>>>>>>>>>> [1].
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Matthias
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>>>>>>>>>>
>>>>>>>>>> On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <om...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Matthias.
>>>>>>>>>>> My name is Omer, i am Daniel's devops, i will elaborate
>>>>>>>>>>> about our flink situation.
>>>>>>>>>>> these our flink resource definitions, as they are generated
>>>>>>>>>>> using the helm template command (minus log4j,metrics configuration and some
>>>>>>>>>>> sensitive data)
>>>>>>>>>>> ---
>>>>>>>>>>> # Source: flink/templates/flink-configmap.yaml
>>>>>>>>>>> apiVersion: v1
>>>>>>>>>>> kind: ConfigMap
>>>>>>>>>>> metadata:
>>>>>>>>>>>   name: flink-config
>>>>>>>>>>>   labels:
>>>>>>>>>>>     app: flink
>>>>>>>>>>> data:
>>>>>>>>>>>   flink-conf.yaml: |
>>>>>>>>>>>     jobmanager.rpc.address: flink-jobmanager
>>>>>>>>>>>     jobmanager.rpc.port: 6123
>>>>>>>>>>>     jobmanager.execution.failover-strategy: region
>>>>>>>>>>>     jobmanager.memory.process.size: 8g
>>>>>>>>>>>     taskmanager.memory.process.size: 24g
>>>>>>>>>>>     taskmanager.memory.task.off-heap.size: 1g
>>>>>>>>>>>     taskmanager.numberOfTaskSlots: 4
>>>>>>>>>>>     queryable-state.proxy.ports: 6125
>>>>>>>>>>>     queryable-state.enable: true
>>>>>>>>>>>     blob.server.port: 6124
>>>>>>>>>>>     parallelism.default: 1
>>>>>>>>>>>     state.backend.incremental: true
>>>>>>>>>>>     state.backend: rocksdb
>>>>>>>>>>>     state.backend.rocksdb.localdir: /opt/flink/rocksdb
>>>>>>>>>>>     state.checkpoints.dir: file:///opt/flink/checkpoints
>>>>>>>>>>>     classloader.resolve-order: child-first
>>>>>>>>>>>     kubernetes.cluster-id: flink-cluster
>>>>>>>>>>>     kubernetes.namespace: intel360-beta
>>>>>>>>>>>     high-availability:
>>>>>>>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>>>>>>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>>>>>>>>>
>>>>>>>>>>> ---
>>>>>>>>>>> # Source: flink/templates/flink-service.yaml
>>>>>>>>>>> apiVersion: v1
>>>>>>>>>>> kind: Service
>>>>>>>>>>> metadata:
>>>>>>>>>>>   name: flink-jobmanager
>>>>>>>>>>>   labels:
>>>>>>>>>>>     {}
>>>>>>>>>>> spec:
>>>>>>>>>>>   ports:
>>>>>>>>>>>   - name: http-ui
>>>>>>>>>>>     port: 8081
>>>>>>>>>>>     targetPort: http-ui
>>>>>>>>>>>   - name: tcp-rpc
>>>>>>>>>>>     port: 6123
>>>>>>>>>>>     targetPort: tcp-rpc
>>>>>>>>>>>   - name: tcp-blob
>>>>>>>>>>>     port: 6124
>>>>>>>>>>>     targetPort: tcp-blob
>>>>>>>>>>>   selector:
>>>>>>>>>>>     app: flink
>>>>>>>>>>>     component: jobmanager
>>>>>>>>>>> ---
>>>>>>>>>>> # Source: flink/templates/flink-deployment.yaml
>>>>>>>>>>> apiVersion: apps/v1
>>>>>>>>>>> kind: Deployment
>>>>>>>>>>> metadata:
>>>>>>>>>>>   name: flink-jobmanager
>>>>>>>>>>> spec:
>>>>>>>>>>>   replicas: 1
>>>>>>>>>>>   selector:
>>>>>>>>>>>     matchLabels:
>>>>>>>>>>>       app: flink
>>>>>>>>>>>       component: jobmanager
>>>>>>>>>>>   template:
>>>>>>>>>>>     metadata:
>>>>>>>>>>>       labels:
>>>>>>>>>>>         app: flink
>>>>>>>>>>>         component: jobmanager
>>>>>>>>>>>       annotations:
>>>>>>>>>>>         checksum/config:
>>>>>>>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>>>>>>>>>     spec:
>>>>>>>>>>>       containers:
>>>>>>>>>>>       - name: jobmanager
>>>>>>>>>>>         image: flink:1.12.1-scala_2.11-java11
>>>>>>>>>>>         args: [ "jobmanager" ]
>>>>>>>>>>>         ports:
>>>>>>>>>>>         - name: http-ui
>>>>>>>>>>>           containerPort: 8081
>>>>>>>>>>>         - name: tcp-rpc
>>>>>>>>>>>           containerPort: 6123
>>>>>>>>>>>         - name: tcp-blob
>>>>>>>>>>>           containerPort: 6124
>>>>>>>>>>>         resources:
>>>>>>>>>>>           {}
>>>>>>>>>>>         # Environment Variables
>>>>>>>>>>>         env:
>>>>>>>>>>>         - name: ENABLE_CHECKPOINTING
>>>>>>>>>>>           value: "true"
>>>>>>>>>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>>>>>>>>>           value: "flink-jobmanager"
>>>>>>>>>>>         volumeMounts:
>>>>>>>>>>>         - name: flink-config
>>>>>>>>>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>>>>>>>>>           subPath: flink-conf.yaml
>>>>>>>>>>>         # NFS mounts
>>>>>>>>>>>         - name: flink-checkpoints
>>>>>>>>>>>           mountPath: "/opt/flink/checkpoints"
>>>>>>>>>>>         - name: flink-recovery
>>>>>>>>>>>           mountPath: "/opt/flink/recovery"
>>>>>>>>>>>       volumes:
>>>>>>>>>>>       - name: flink-config
>>>>>>>>>>>         configMap:
>>>>>>>>>>>           name: flink-config
>>>>>>>>>>>       # NFS volumes
>>>>>>>>>>>       - name: flink-checkpoints
>>>>>>>>>>>         nfs:
>>>>>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>>>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>>>>>>>>>       - name: flink-recovery
>>>>>>>>>>>         nfs:
>>>>>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>>>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>>>>>>>>>> ---
>>>>>>>>>>> # Source: flink/templates/flink-deployment.yaml
>>>>>>>>>>> apiVersion: apps/v1
>>>>>>>>>>> kind: Deployment
>>>>>>>>>>> metadata:
>>>>>>>>>>>   name: flink-taskmanager
>>>>>>>>>>> spec:
>>>>>>>>>>>   replicas: 7
>>>>>>>>>>>   selector:
>>>>>>>>>>>     matchLabels:
>>>>>>>>>>>       app: flink
>>>>>>>>>>>       component: taskmanager
>>>>>>>>>>>   template:
>>>>>>>>>>>     metadata:
>>>>>>>>>>>       labels:
>>>>>>>>>>>         app: flink
>>>>>>>>>>>         component: taskmanager
>>>>>>>>>>>       annotations:
>>>>>>>>>>>         checksum/config:
>>>>>>>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>>>>>>>>>     spec:
>>>>>>>>>>>       containers:
>>>>>>>>>>>       - name: taskmanager
>>>>>>>>>>>         image: flink:1.12.1-scala_2.11-java11
>>>>>>>>>>>         args: [ "taskmanager" ]
>>>>>>>>>>>         resources:
>>>>>>>>>>>           limits:
>>>>>>>>>>>             cpu: 6000m
>>>>>>>>>>>             memory: 24Gi
>>>>>>>>>>>           requests:
>>>>>>>>>>>             cpu: 6000m
>>>>>>>>>>>             memory: 24Gi
>>>>>>>>>>>         # Environment Variables
>>>>>>>>>>>         env:
>>>>>>>>>>>         - name: ENABLE_CHECKPOINTING
>>>>>>>>>>>           value: "true"
>>>>>>>>>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>>>>>>>>>           value: "flink-jobmanager"
>>>>>>>>>>>         volumeMounts:
>>>>>>>>>>>         - name: flink-config
>>>>>>>>>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>>>>>>>>>           subPath: flink-conf.yaml
>>>>>>>>>>>         # NFS mounts
>>>>>>>>>>>         - name: flink-checkpoints
>>>>>>>>>>>           mountPath: "/opt/flink/checkpoints"
>>>>>>>>>>>         - name: flink-recovery
>>>>>>>>>>>           mountPath: "/opt/flink/recovery"
>>>>>>>>>>>       volumes:
>>>>>>>>>>>       - name: flink-config
>>>>>>>>>>>         configMap:
>>>>>>>>>>>           name: flink-config
>>>>>>>>>>>       # NFS volumes
>>>>>>>>>>>       - name: flink-checkpoints
>>>>>>>>>>>         nfs:
>>>>>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>>>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>>>>>>>>>       - name: flink-recovery
>>>>>>>>>>>         nfs:
>>>>>>>>>>>           server: "my-nfs-server.my-org"
>>>>>>>>>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>>>>>>>>>> ---
>>>>>>>>>>> # Source: flink/templates/flink-ingress.yaml
>>>>>>>>>>> apiVersion: extensions/v1beta1
>>>>>>>>>>> kind: Ingress
>>>>>>>>>>> metadata:
>>>>>>>>>>>   name: jobmanager
>>>>>>>>>>> spec:
>>>>>>>>>>>   rules:
>>>>>>>>>>>     - host: my.flink.job.manager.url
>>>>>>>>>>>       http:
>>>>>>>>>>>         paths:
>>>>>>>>>>>           - path: /
>>>>>>>>>>>             backend:
>>>>>>>>>>>               serviceName: flink-jobmanager
>>>>>>>>>>>               servicePort: 8081
>>>>>>>>>>> ---
>>>>>>>>>>>
>>>>>>>>>>> as you can see we are using the skeleton of the standalone
>>>>>>>>>>> configuration as it documented here:
>>>>>>>>>>>
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>>>>>>>>>>> with some per-company configuration obviously, but still under
>>>>>>>>>>> the scope of this document..
>>>>>>>>>>>
>>>>>>>>>>> on a normal beautiful day and without the HA configuration,
>>>>>>>>>>> everything works fine.
>>>>>>>>>>> when trying to configure kubernetes HA using this document:
>>>>>>>>>>>
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>>>>>> with the following parameters:
>>>>>>>>>>>     kubernetes.cluster-id: flink-cluster
>>>>>>>>>>>     high-availability:
>>>>>>>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>>>>>>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>>>>>>>>>
>>>>>>>>>>> the jobmanager fails with the following error:
>>>>>>>>>>> 2021-02-14 16:57:19,103 ERROR
>>>>>>>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>>>>>>>>>> Exception occurred while acquiring lock 'ConfigMapLock: default -
>>>>>>>>>>> flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
>>>>>>>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>>>>>>>>>> executing: GET at:
>>>>>>>>>>> https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader.
>>>>>>>>>>> Message: Forbidden!Configured service account doesn't have access. Service
>>>>>>>>>>> account may have been revoked. configmaps "flink-cluster-restserver-leader"
>>>>>>>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get
>>>>>>>>>>> resource "configmaps" in API group "" in the namespace "default".
>>>>>>>>>>>
>>>>>>>>>>> so we added this line as well (as you can see in the
>>>>>>>>>>> flink-config configmap above)
>>>>>>>>>>> kubernetes.namespace: intel360-beta
>>>>>>>>>>> although it is not part of the document and i don't think flink
>>>>>>>>>>> should be aware of the namespace it resides in, it damages the modularity
>>>>>>>>>>> of upper layers of configurations, regardless we added it and then got the
>>>>>>>>>>> the following error:
>>>>>>>>>>>
>>>>>>>>>>> 2021-02-14 17:00:57,086 ERROR
>>>>>>>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>>>>>>>>>> Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta -
>>>>>>>>>>> flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
>>>>>>>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>>>>>>>>>> executing: GET at:
>>>>>>>>>>> https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader.
>>>>>>>>>>> Message: Forbidden!Configured service account doesn't have access. Service
>>>>>>>>>>> account may have been revoked. configmaps "flink-cluster-restserver-leader"
>>>>>>>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get
>>>>>>>>>>> resource "configmaps" in API group "" in the namespace "intel360-beta".
>>>>>>>>>>>
>>>>>>>>>>> which is bassically the same error message just directed to the
>>>>>>>>>>> flink's namespace.
>>>>>>>>>>> my question is, do i need to add RBAC to the flink's service
>>>>>>>>>>> account, because i got the impression from the flink official documents and
>>>>>>>>>>> some blogs responses that it designed to function without any special
>>>>>>>>>>> permissions.
>>>>>>>>>>> if we do need RBAC can you give an official documentations
>>>>>>>>>>> reference of the exact permissions.
>>>>>>>>>>>
>>>>>>>>>>> NOTE: as you can see our flink-checkpoints and recovery
>>>>>>>>>>> locations are directed to a local directory mounted to a shared NFS between
>>>>>>>>>>> all tasks and job manager, since our infrastructure is bare-metal by
>>>>>>>>>>> design. (although this one is hosted in AWS)
>>>>>>>>>>>
>>>>>>>>>>> thanks in advance
>>>>>>>>>>> Omer
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------- Forwarded message ---------
>>>>>>>>>>> From: Daniel Peled <da...@gmail.com>
>>>>>>>>>>> Date: Sun, Feb 14, 2021 at 6:18 PM
>>>>>>>>>>> Subject: Fwd: Flink’s Kubernetes HA services - NOT working
>>>>>>>>>>> To: <om...@gmail.com>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------- Forwarded message ---------
>>>>>>>>>>> מאת: Matthias Pohl <ma...@ververica.com>
>>>>>>>>>>> ‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
>>>>>>>>>>> Subject: Re: Flink’s Kubernetes HA services - NOT working
>>>>>>>>>>> To: Matthias Pohl <ma...@ververica.com>
>>>>>>>>>>> Cc: Daniel Peled <da...@gmail.com>, user <
>>>>>>>>>>> user@flink.apache.org>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> One other thing: It looks like you've set
>>>>>>>>>>> high-availability.storageDir to a local path file:///opt/flink/recovery.
>>>>>>>>>>> You should use a storage path that is accessible from all Flink cluster
>>>>>>>>>>> components (e.g. using S3). Only references are stored in Kubernetes
>>>>>>>>>>> ConfigMaps [1].
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Matthias
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#configuration
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <
>>>>>>>>>>> matthias@ververica.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Daniel,
>>>>>>>>>>>> what's the exact configuration you used? Did you use the
>>>>>>>>>>>> resource definitions provided in the Standalone Flink on Kubernetes docs
>>>>>>>>>>>> [1]? Did you do certain things differently in comparison to the
>>>>>>>>>>>> documentation?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Matthias
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#appendix
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <
>>>>>>>>>>>> daniel.peled.work@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> ,Hey
>>>>>>>>>>>>>
>>>>>>>>>>>>> We are using standalone flink on kubernetes
>>>>>>>>>>>>> :"And we have followed the instructions in the following link
>>>>>>>>>>>>> "Kubernetes HA Services
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>>>>>>>> .We were unable to make it work
>>>>>>>>>>>>> .We are facing a lot of problems
>>>>>>>>>>>>> For example some of the jobs don't start complaining that
>>>>>>>>>>>>> there are not enough slots available - although there are enough slots  and
>>>>>>>>>>>>> it seems as the job manager is NOT aware of all the task managers
>>>>>>>>>>>>> .In other scenario we were unable to run any job at all
>>>>>>>>>>>>>  The flink dashboard is unresponsive and we get the error
>>>>>>>>>>>>> "flink service temporarily unavailable due to an ongoing
>>>>>>>>>>>>> leader election. please refresh"
>>>>>>>>>>>>> .We believe we are missing some configurations
>>>>>>>>>>>>>  ?Are there any more detailed instructions
>>>>>>>>>>>>> ?And suggestions/tips
>>>>>>>>>>>>>  .Attached is the log of the job manager in one of the attempts
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please give me some advice.
>>>>>>>>>>>>> BR,
>>>>>>>>>>>>> Danny
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>