You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yang Wang (Jira)" <ji...@apache.org> on 2021/01/15 05:55:00 UTC
[jira] [Comment Edited] (FLINK-20982) Flink TaskManager throws
RegistrationTimeoutException when using Kubernetes HA
[ https://issues.apache.org/jira/browse/FLINK-20982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17265715#comment-17265715 ]
Yang Wang edited comment on FLINK-20982 at 1/15/21, 5:54 AM:
-------------------------------------------------------------
I use the following four yaml files to deploy a Flink application with HA enable on minikube. It works well.
Please note that we do not need to create the internal service. Because the TaskManager will use K8sHAService for the leader retrieval. This requires the JobManager is binding to the pod ip address, not the service.
{code:java}
# flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
kubernetes.cluster-id: standalone-k8s-ha-app1
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: oss://flink-debug-yiqi/flink-ha
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
# Replace with a valid S3 or OSS account
fs.oss.endpoint: xxxx
fs.oss.accessKeyId: xxxx
fs.oss.accessKeySecret: xxxx
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO # Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF logger.rest.name = org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
logger.rest.level = ERROR
logger.minirest.name = org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint
logger.minirest.level = ERROR
{code}
{code:java}
# jobmanager-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
initContainers:
- name: artifacts-fetcher
image: busybox
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
command: [ 'wget', 'https://flink-debug-yiqi.oss-cn-beijing.aliyuncs.com/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
- name: jobmanager
image: registry.cn-beijing.aliyuncs.com/streamcompute/flink:1.12.1
imagePullPolicy: Always
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-oss-fs-hadoop-1.12.1.jar
- name: _POD_IP_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.statemachine.StateMachineExample", "--host", "$(_POD_IP_ADDRESS)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- mountPath: /opt/flink/usrlib
name: flink-artifact
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-artifact
emptyDir: { }
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
{code}
{code:java}
# taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
initContainers:
- name: artifacts-fetcher
image: busybox
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
command: [ 'wget', 'https://flink-debug-yiqi.oss-cn-beijing.aliyuncs.com/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
- name: taskmanager
image: registry.cn-beijing.aliyuncs.com/streamcompute/flink:1.12.1
imagePullPolicy: Always
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-oss-fs-hadoop-1.12.1.jar
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- mountPath: /opt/flink/usrlib
name: flink-artifact
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-artifact
emptyDir: { }
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties{code}
{code:java}
# jobmanager-rest-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: LoadBalancer
ports:
- name: rest
port: 8081
selector:
app: flink
component: jobmanager
{code}
was (Author: fly_in_gis):
I use the following four yaml files to deploy a Flink application with HA enable on minikube. It works well.
Please note that we do not need to create the internal service. Because the TaskManager will use K8sHAService for the leader retrieval. This requires the JobManager is binding to the pod ip address, not the service.
{code:java}
// flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
kubernetes.cluster-id: standalone-k8s-ha-app1
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: oss://flink-debug-yiqi/flink-ha
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
# Replace with a valid S3 or OSS account
fs.oss.endpoint: xxxx
fs.oss.accessKeyId: xxxx
fs.oss.accessKeySecret: xxxx
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO # Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF logger.rest.name = org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
logger.rest.level = ERROR
logger.minirest.name = org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint
logger.minirest.level = ERROR
{code}
{code:java}
# jobmanager-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
initContainers:
- name: artifacts-fetcher
image: busybox
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
command: [ 'wget', 'https://flink-debug-yiqi.oss-cn-beijing.aliyuncs.com/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
- name: jobmanager
image: registry.cn-beijing.aliyuncs.com/streamcompute/flink:1.12.1
imagePullPolicy: Always
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-oss-fs-hadoop-1.12.1.jar
- name: _POD_IP_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.statemachine.StateMachineExample", "--host", "$(_POD_IP_ADDRESS)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- mountPath: /opt/flink/usrlib
name: flink-artifact
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-artifact
emptyDir: { }
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
{code}
{code:java}
// taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
initContainers:
- name: artifacts-fetcher
image: busybox
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
command: [ 'wget', 'https://flink-debug-yiqi.oss-cn-beijing.aliyuncs.com/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
- name: taskmanager
image: registry.cn-beijing.aliyuncs.com/streamcompute/flink:1.12.1
imagePullPolicy: Always
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-oss-fs-hadoop-1.12.1.jar
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- mountPath: /opt/flink/usrlib
name: flink-artifact
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-artifact
emptyDir: { }
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties{code}
{code:java}
// jobmanager-rest-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: LoadBalancer
ports:
- name: rest
port: 8081
selector:
app: flink
component: jobmanager
{code}
> Flink TaskManager throws RegistrationTimeoutException when using Kubernetes HA
> ------------------------------------------------------------------------------
>
> Key: FLINK-20982
> URL: https://issues.apache.org/jira/browse/FLINK-20982
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.12.0
> Reporter: Levani Kokhreidze
> Priority: Major
>
> I'm experimenting with Flink 1.12 release and testing out Kubernetes HA feature with Minikube. Unfortunately, when using Kubernetes HA, *TaskManager* can not register at *ResourceManager* and it throws *RegistrationTimeoutException* after 5min.
> I can see that JobManager creates config maps with Kubernetes HA
>
> {code:java}
> ➜ k get configmaps
> NAME DATA AGE
> cluster1-00000000000000000000000000000000-jobmanager-leader 2 109s
> cluster1-dispatcher-leader 4 2m
> cluster1-resourcemanager-leader 2 2m
> cluster1-restserver-leader 2 2m
> {code}
>
> And when inspecting *{{cluster1-resourcemanager-leader}}* it seems to have correct value.
> {code:java}
> ➜ k describe configmaps cluster1-resourcemanager-leader
> Name: cluster1-resourcemanager-leader
> Namespace: default
> Labels: app=cluster1
> configmap-type=high-availability
> type=flink-native-kubernetes
> Annotations: control-plane.alpha.kubernetes.io/leader:
> {"holderIdentity":"c227d599-9dee-4f98-81ed-dde40a1865d6","leaseDuration":15.000000000,"acquireTime":"2021-01-14T11:26:27.660920Z","renewTi...Data
> ====
> address:
> ----
> akka.tcp://flink@172.18.0.9:6565/user/rpc/resourcemanager_0
> sessionId:
> ----
> 8ede2bdf-4aa2-4da7-9a7a-bfc737e277bb
> Events: <none>{code}
> *172.18.0.9* is indeed the IP of the leader.
> Connectivity from TaskManager pod also checks out.
> {code:java}
> bash-4.2$ nc -vz 172.18.0.9 6565
> Ncat: Version 7.50 ( https://nmap.org/ncat )
> Ncat: Connected to 172.18.0.9:6565.
> Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.{code}
> Here's flink-conf.yaml file.
> {code:java}
> bash-4.2$ cat /opt/flink/conf/flink-conf.yaml
> jobmanager.rpc.port: 6565
> jobmanager.heap.size: 1024m
> jobmanager.execution.failover-strategy: region
> blob.server.port: 10901
> taskmanager.memory.process.size: 1728m
> taskmanager.numberOfTaskSlots: 1
> taskmanager.rpc.port: 6565
> taskmanager.data.port: 10901
> parallelism.default: 1
> # KUBERNETES HA
> high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: cluster1
> # ZOOKEEPER HA
> #high-availability: zookeeper
> #high-availability.zookeeper.path.root: /flink
> #high-availability.cluster-id: cluster1
> high-availability.jobmanager.port: 6565
> s3.path.style.access: true
> s3.endpoint: http://minio.streaming-cluster-minio.svc.cluster.local:9000
> web.submit.enable: false
> metrics.reporters: prom
> metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.port: 8097
> s3.secret-key: minio123
> s3.access-key: ****
> taskmanager.host: 172.18.0.10
> high-availability.storageDir: s3://state/ha
> {code}
> The same setup, but with *zookeeper* HA works without any issues.
> JobManager starts with the following command:
> {code:java}
> exec "$FLINK_HOME"/bin/standalone-job.sh start-foreground "$@"
> {code}
> This is *ClusterRole* and *ClusterRoleBinding* that is granted to the namespace.
> {code:java}
> kubectl create clusterrole cr --verb="*" --resource=configmaps
> kubectl create clusterrolebinding crb --clusterrole=cr --serviceaccount=streaming-cluster-flink:default{code}
> Exception stacktrace happening in TaskManager
> {code:java}
> 2021-01-14 11:23:13,149 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Fatal error occurred in TaskExecutor akka.tcp://flink@172.18.0.10:6565/user/rpc/taskmanager_0.
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration 300000 ms. This indicates a problem with this instance. Terminating now.
> at org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1258) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1244) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.0.jar:1.12.0]
> 2021-01-14 11:23:13,158 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration 300000 ms. This indicates a problem with this instance. Terminating now.
> at org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1258) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1244) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.0.jar:1.12.0]
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.0.jar:1.12.0]{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)