You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yang Wang (Jira)" <ji...@apache.org> on 2021/01/15 05:55:00 UTC

[jira] [Comment Edited] (FLINK-20982) Flink TaskManager throws RegistrationTimeoutException when using Kubernetes HA

    [ https://issues.apache.org/jira/browse/FLINK-20982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265715#comment-17265715 ] 

Yang Wang edited comment on FLINK-20982 at 1/15/21, 5:54 AM:
-------------------------------------------------------------

I use the following four yaml files to deploy a Flink application with HA enable on minikube. It works well.

Please note that we do not need to create the internal service. Because the TaskManager will use K8sHAService for the leader retrieval. This requires the JobManager is binding to the pod ip address, not the service.

 

 
{code:java}
# flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
    kubernetes.cluster-id: standalone-k8s-ha-app1
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: oss://flink-debug-yiqi/flink-ha
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 10
    # Replace with a valid S3 or OSS account
    fs.oss.endpoint: xxxx
    fs.oss.accessKeyId: xxxx
    fs.oss.accessKeySecret: xxxx
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF    logger.rest.name = org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
    logger.rest.level = ERROR
    logger.minirest.name = org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint
    logger.minirest.level = ERROR
{code}
 

 

 
{code:java}
# jobmanager-job.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      initContainers:
        - name: artifacts-fetcher
          image: busybox
          imagePullPolicy: IfNotPresent
          # Use wget or other tools to get user jars from remote storage
          command: [ 'wget', 'https://flink-debug-yiqi.oss-cn-beijing.aliyuncs.com/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
          volumeMounts:
            - mountPath: /flink-artifact
              name: flink-artifact
      containers:
        - name: jobmanager
          image: registry.cn-beijing.aliyuncs.com/streamcompute/flink:1.12.1
          imagePullPolicy: Always
          env:
          - name: ENABLE_BUILT_IN_PLUGINS
            value: flink-oss-fs-hadoop-1.12.1.jar
          - name: _POD_IP_ADDRESS
            valueFrom:
              fieldRef:
                apiVersion: v1
                fieldPath: status.podIP
          args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.statemachine.StateMachineExample", "--host", "$(_POD_IP_ADDRESS)"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - mountPath: /opt/flink/usrlib
              name: flink-artifact
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-artifact
          emptyDir: { }
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
{code}
 
{code:java}
# taskmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      initContainers:
        - name: artifacts-fetcher
          image: busybox
          imagePullPolicy: IfNotPresent
          # Use wget or other tools to get user jars from remote storage
          command: [ 'wget', 'https://flink-debug-yiqi.oss-cn-beijing.aliyuncs.com/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
          volumeMounts:
            - mountPath: /flink-artifact
              name: flink-artifact
      containers:
      - name: taskmanager
        image: registry.cn-beijing.aliyuncs.com/streamcompute/flink:1.12.1
        imagePullPolicy: Always
        env:
        - name: ENABLE_BUILT_IN_PLUGINS
          value: flink-oss-fs-hadoop-1.12.1.jar
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - mountPath: /opt/flink/usrlib
          name: flink-artifact
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-artifact
        emptyDir: { }
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties{code}
{code:java}
# jobmanager-rest-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: LoadBalancer
  ports:
  - name: rest
    port: 8081
  selector:
    app: flink
    component: jobmanager
{code}
 


was (Author: fly_in_gis):
I use the following four yaml files to deploy a Flink application with HA enable on minikube. It works well.

Please note that we do not need to create the internal service. Because the TaskManager will use K8sHAService for the leader retrieval. This requires the JobManager is binding to the pod ip address, not the service.

 

 
{code:java}
// flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
    kubernetes.cluster-id: standalone-k8s-ha-app1
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: oss://flink-debug-yiqi/flink-ha
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 10
    # Replace with a valid S3 or OSS account
    fs.oss.endpoint: xxxx
    fs.oss.accessKeyId: xxxx
    fs.oss.accessKeySecret: xxxx
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF    logger.rest.name = org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
    logger.rest.level = ERROR
    logger.minirest.name = org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint
    logger.minirest.level = ERROR
{code}
 

 

 
{code:java}
# jobmanager-job.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      initContainers:
        - name: artifacts-fetcher
          image: busybox
          imagePullPolicy: IfNotPresent
          # Use wget or other tools to get user jars from remote storage
          command: [ 'wget', 'https://flink-debug-yiqi.oss-cn-beijing.aliyuncs.com/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
          volumeMounts:
            - mountPath: /flink-artifact
              name: flink-artifact
      containers:
        - name: jobmanager
          image: registry.cn-beijing.aliyuncs.com/streamcompute/flink:1.12.1
          imagePullPolicy: Always
          env:
          - name: ENABLE_BUILT_IN_PLUGINS
            value: flink-oss-fs-hadoop-1.12.1.jar
          - name: _POD_IP_ADDRESS
            valueFrom:
              fieldRef:
                apiVersion: v1
                fieldPath: status.podIP
          args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.statemachine.StateMachineExample", "--host", "$(_POD_IP_ADDRESS)"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - mountPath: /opt/flink/usrlib
              name: flink-artifact
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-artifact
          emptyDir: { }
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
{code}
 
{code:java}
// taskmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      initContainers:
        - name: artifacts-fetcher
          image: busybox
          imagePullPolicy: IfNotPresent
          # Use wget or other tools to get user jars from remote storage
          command: [ 'wget', 'https://flink-debug-yiqi.oss-cn-beijing.aliyuncs.com/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
          volumeMounts:
            - mountPath: /flink-artifact
              name: flink-artifact
      containers:
      - name: taskmanager
        image: registry.cn-beijing.aliyuncs.com/streamcompute/flink:1.12.1
        imagePullPolicy: Always
        env:
        - name: ENABLE_BUILT_IN_PLUGINS
          value: flink-oss-fs-hadoop-1.12.1.jar
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - mountPath: /opt/flink/usrlib
          name: flink-artifact
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-artifact
        emptyDir: { }
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties{code}
{code:java}
// jobmanager-rest-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: LoadBalancer
  ports:
  - name: rest
    port: 8081
  selector:
    app: flink
    component: jobmanager
{code}
 

> Flink TaskManager throws RegistrationTimeoutException when using Kubernetes HA
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-20982
>                 URL: https://issues.apache.org/jira/browse/FLINK-20982
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.12.0
>            Reporter: Levani Kokhreidze
>            Priority: Major
>
> I'm experimenting with Flink 1.12 release and testing out Kubernetes HA feature with Minikube. Unfortunately, when using Kubernetes HA, *TaskManager* can not register at *ResourceManager* and it throws *RegistrationTimeoutException* after 5min.
> I can see that JobManager creates config maps with Kubernetes HA
>  
> {code:java}
> ➜ k get configmaps
> NAME                                                          DATA   AGE
> cluster1-00000000000000000000000000000000-jobmanager-leader   2      109s
> cluster1-dispatcher-leader                                    4      2m
> cluster1-resourcemanager-leader                               2      2m
> cluster1-restserver-leader                                    2      2m
> {code}
>   
> And when inspecting *{{cluster1-resourcemanager-leader}}* it seems to have correct value.
> {code:java}
> ➜ k describe configmaps cluster1-resourcemanager-leader
> Name:         cluster1-resourcemanager-leader
> Namespace:    default
> Labels:       app=cluster1
>               configmap-type=high-availability
>               type=flink-native-kubernetes
> Annotations:  control-plane.alpha.kubernetes.io/leader:
>                 {"holderIdentity":"c227d599-9dee-4f98-81ed-dde40a1865d6","leaseDuration":15.000000000,"acquireTime":"2021-01-14T11:26:27.660920Z","renewTi...Data
> ====
> address:
> ----
> akka.tcp://flink@172.18.0.9:6565/user/rpc/resourcemanager_0
> sessionId:
> ----
> 8ede2bdf-4aa2-4da7-9a7a-bfc737e277bb
> Events:  <none>{code}
> *172.18.0.9* is indeed the IP of the leader.
> Connectivity from TaskManager pod also checks out.
> {code:java}
> bash-4.2$ nc -vz 172.18.0.9 6565
> Ncat: Version 7.50 ( https://nmap.org/ncat )
> Ncat: Connected to 172.18.0.9:6565.
> Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.{code}
> Here's flink-conf.yaml file.
> {code:java}
> bash-4.2$ cat /opt/flink/conf/flink-conf.yaml
> jobmanager.rpc.port: 6565
> jobmanager.heap.size: 1024m
> jobmanager.execution.failover-strategy: region
> blob.server.port: 10901
> taskmanager.memory.process.size: 1728m
> taskmanager.numberOfTaskSlots: 1
> taskmanager.rpc.port: 6565
> taskmanager.data.port: 10901
> parallelism.default: 1
> # KUBERNETES HA
> high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: cluster1
> # ZOOKEEPER HA
> #high-availability: zookeeper
> #high-availability.zookeeper.path.root: /flink
> #high-availability.cluster-id: cluster1
> high-availability.jobmanager.port: 6565
> s3.path.style.access: true
> s3.endpoint: http://minio.streaming-cluster-minio.svc.cluster.local:9000
> web.submit.enable: false
> metrics.reporters: prom
> metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.port: 8097
> s3.secret-key: minio123
> s3.access-key: ****
> taskmanager.host: 172.18.0.10
> high-availability.storageDir: s3://state/ha
> {code}
> The same setup, but with *zookeeper* HA works without any issues.
> JobManager starts with  the following command:
> {code:java}
> exec "$FLINK_HOME"/bin/standalone-job.sh start-foreground "$@"
> {code}
> This is *ClusterRole* and *ClusterRoleBinding* that is granted to the namespace.
> {code:java}
> kubectl create clusterrole cr --verb="*" --resource=configmaps
> kubectl create clusterrolebinding crb --clusterrole=cr --serviceaccount=streaming-cluster-flink:default{code}
> Exception stacktrace happening in TaskManager
> {code:java}
> 2021-01-14 11:23:13,149 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Fatal error occurred in TaskExecutor akka.tcp://flink@172.18.0.10:6565/user/rpc/taskmanager_0.
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration 300000 ms. This indicates a problem with this instance. Terminating now.
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1258) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1244) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.0.jar:1.12.0]
> 2021-01-14 11:23:13,158 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration 300000 ms. This indicates a problem with this instance. Terminating now.
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1258) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1244) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.0.jar:1.12.0]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)