You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Stone <el...@gmail.com> on 2018/10/31 23:43:52 UTC
TaskManagers cannot contact JobManager in Kubernetes when JobManager HA is enabled
I have successfully managed to deploy a Flink cluster in Kubernetes without JobManager high availability. Everything works great. The moment I enable high availability, TaskManagers fail to contact the JobManager. My configurations and logs are below. Can someone point me in the correct direction? Many thanks!
Note the following:
- Cluster is running Flink 1.6.2 .
- Logs for the JobManager and TaskManagers indicate that they can successfully connect to the Zookeeper instances.
- The Zookeeper instances reside as standard EC2 instances while the Flink cluster is fully contained in Kubernetes. Everything is within the same VPC.
- I have tried a TaskManager configuration which does not contain "high-availability: zookeeper" (i.e. launched the JM under one configuration and the TMs under another such that this is the only difference in the flink-conf.yaml file). This did not help.
============================================================================
TaskManager Log Snippet
2018-10-31 23:14:09,035 INFO akka.remote.transport.ProtocolStateActor - No response from remote for outbound association. Associate timed out after [20000 ms].
2018-10-31 23:14:09,035 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@flink-jobmanager:43982] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flink-jobmanager:43982]] Caused by: [No response from remote for outbound association. Associate timed out after [20000 ms].]
2018-10-31 23:14:09,128 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: flink-jobmanager/100.70.34.197:43982
2018-10-31 23:14:19,135 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:43982/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:43982/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2018-10-31 23:14:28,065 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - Fatal error occurred in TaskExecutor akka.tcp://flink@flink-taskmanager-75f746bdf7-fpw9h:38130/user/taskmanager_0.
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration 300000 ms. This indicates a problem with this instance. Terminating now.
============================================================================
Configurations
JobManager Deployment
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.6.2-hadoop28-scala_2.11
command: ["/bin/sh", "-c", "cp /opt/flink/opt/flink-s3-fs-hadoop-1.6.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.6.2.jar && /docker-entrypoint.sh jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
- name: FLINK_CONF_DIR
value: /etc/flink
volumeMounts:
- name: flink-config
mountPath: /etc/flink
- name: flink-hadoop-config
mountPath: /etc/hadoop/conf
volumes:
- name: flink-config
configMap:
name: flink-config
- name: flink-hadoop-config
configMap:
name: flink-hadoop-config
-----------------------------------------------------------------------------
JobManager Service
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: query
port: 6125
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
-----------------------------------------------------------------------------
TaskManager Deployment
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.6.2-hadoop28-scala_2.11
command: ["/bin/sh", "-c", "cp /opt/flink/opt/flink-s3-fs-hadoop-1.6.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.6.2.jar && /docker-entrypoint.sh taskmanager"]
ports:
- containerPort: 6121
name: data
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
- name: FLINK_CONF_DIR
value: /etc/flink
resources:
limits:
memory: "2Gi"
requests:
memory: "2Gi"
volumeMounts:
- name: flink-config
mountPath: /etc/flink
- name: flink-hadoop-config
mountPath: /etc/hadoop/conf
volumes:
- name: flink-config
configMap:
name: flink-config
- name: flink-hadoop-config
configMap:
name: flink-hadoop-config
-----------------------------------------------
Hadoop ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-hadoop-config
labels:
app: flink
data:
core-site.xml: |
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
------------------------------------------------
Flink ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |
blob.server.port: 6124
high-availability: zookeeper
high-availability.zookeeper.quorum: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
high-availability.storageDir: s3a://flink-test-bucket/flink-zk-storage
high-availability.zookeeper.path.root: /flink-test
high-availability.cluster-id: /flink-test
jobmanager.heap.size: 1024m
jobmanager.rpc.address: flink-jobmanager
jobmanager.rpc.port: 6123
parallelism.default: 1
query.server.port: 6125
rest.port: 8081
state.backend.incremental: true
state.backend: rocksdb
state.checkpoints.dir: s3a://flink-test-bucket/flink-checkpoints
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 4
log4j-console.properties: |
log4j.rootLogger=INFO, console, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Re: TaskManagers cannot contact JobManager in Kubernetes when
JobManager HA is enabled
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi John,
Glad you resolved the issue. Also thanks for sharing the solution with ML!
Best,
Dawid
On 01/11/2018 16:22, John Stone wrote:
> I've managed to resolve the issue. With HA enabled, you will see this message in the logs:
>
> 2018-11-01 13:38:52,467 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor system started at akka.tcp://flink@flink-jobmanager:40641
>
> Without HA enabled, you will see this message in the logs:
>
> 2018-11-01 13:38:52,467 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor system started at akka.tcp://flink@flink-jobmanager:6123
>
> HA causes a random port assignment for the ResourceManager portion of the JobManager. This can be controlled by setting the high-availability.jobmanager.port to a fixed port and exposing it in the Kubernetes network configuration.
Re: TaskManagers cannot contact JobManager in Kubernetes when JobManager HA is enabled
Posted by John Stone <el...@gmail.com>.
I've managed to resolve the issue. With HA enabled, you will see this message in the logs:
2018-11-01 13:38:52,467 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor system started at akka.tcp://flink@flink-jobmanager:40641
Without HA enabled, you will see this message in the logs:
2018-11-01 13:38:52,467 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor system started at akka.tcp://flink@flink-jobmanager:6123
HA causes a random port assignment for the ResourceManager portion of the JobManager. This can be controlled by setting the high-availability.jobmanager.port to a fixed port and exposing it in the Kubernetes network configuration.