You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mark Harris <ma...@hivehome.com> on 2018/10/23 11:20:51 UTC

KafkaException or ExecutionStateChange failure on job startup

Hi,
We regularly see the following two exceptions in a number of jobs shortly
after they have been resumed during our flink cluster startup:

org.apache.kafka.common.KafkaException: Error registering mbean
kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
    at
org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
    at
org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
    at
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436)
    at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
    at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
    at
org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:749)
    at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:327)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:188)
    at
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:283)
    at
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1344)
    at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77)
    at
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:473)
    at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at
org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
    ... 21 more
java.lang.Exception: Failed to send ExecutionStateChange notification to
JobManager
    at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:439)
    at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:423)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@ip-10-150-24-22.eu-west-1.compute.internal:41775/user/jobmanager#163569829]]
after [30000 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage".
    at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
    at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
    at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
    at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
    at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
    at java.lang.Thread.run(Thread.java:748)


Neither seem related to the job code at all, but seem to be problems with
the metrics on the flink connector and something internal to flink. They
seem to happen once at startup, and don't repeat once the cluster reaches a
steady state.

The jobs also appear to be running correctly in spite of these Exceptions
appearing in their "Exception" tab in the jobmanager.

Is there something that we need to fix in our setup? Are there any
implications around missing metrics etc?

Best regards,

Mark Harris

-- 
hivehome.com <http://www.hivehome.com>






Hive | London | Cambridge | 
Houston | Toronto****
The information contained in or attached to this 
email is confidential and intended only for the use of the individual(s) to 
which it is addressed. It may contain information which is confidential 
and/or covered by legal professional or other privilege. The views 
expressed in this email are not necessarily the views of Centrica plc, and 
the company, its directors, officers or employees make no representation or 
accept any liability for their accuracy or completeness unless expressly 
stated to the contrary. ****
Centrica Hive Limited (company no: 5782908), 
registered in England and Wales with its registered office at Millstream, 
Maidenhead Road, Windsor, Berkshire SL4 5GD.



Re: KafkaException or ExecutionStateChange failure on job startup

Posted by Mark Harris <ma...@hivehome.com>.
Hi Dominik

Setting that bit of configuration seems to have done the trick for the
MXBean exception.

Many thanks for your help.

Best regards,

Mark

On Tue, 23 Oct 2018 at 14:41, Dominik Wosiński <wo...@gmail.com> wrote:

> Hey Mark,
>
> Do You use more than 1 Kafka consumer for Your jobs? I think this relates
> to the known issue in Kafka:
> https://issues.apache.org/jira/browse/KAFKA-3992.
> The problem is that if You don't provide client ID for your
> *KafkaConsumer* Kafka assigns one, but this is done in an unsynchronized
> way, so finally, it ends up in assigning the same id for multiple
> different Consumer instances. Probably this is what happens when multiple
> jobs are resumed at the same time.
>
> What You could try to do is to assign the *consumer.id
> <http://consumer.id>* using properties passed to each consumer. This
> should help in solving this issue.
>
> Best Regards,
> Dom.
>
>
>
>
> wt., 23 paź 2018 o 13:21 Mark Harris <ma...@hivehome.com>
> napisał(a):
>
>> Hi,
>> We regularly see the following two exceptions in a number of jobs shortly
>> after they have been resumed during our flink cluster startup:
>>
>> org.apache.kafka.common.KafkaException: Error registering mbean
>> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>>     at
>> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
>>     at
>> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
>>     at
>> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436)
>>     at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
>>     at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
>>     at
>> org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:749)
>>     at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:327)
>>     at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>>     at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>>     at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:188)
>>     at
>> org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:283)
>>     at
>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1344)
>>     at
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77)
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
>>     at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:473)
>>     at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>     at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: javax.management.InstanceAlreadyExistsException:
>> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>>     at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>     at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>     at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>     at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>     at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>     at
>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>     at
>> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
>>     ... 21 more
>> java.lang.Exception: Failed to send ExecutionStateChange notification to
>> JobManager
>>     at
>> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:439)
>>     at
>> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:423)
>>     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>     at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>     at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>     at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>     at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>     at
>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>     at
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>     at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>     at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka.tcp://flink@ip-10-150-24-22.eu-west-1.compute.internal:41775/user/jobmanager#163569829]]
>> after [30000 ms]. Sender[null] sent message of type
>> "org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage".
>>     at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>     at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>     at
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>     at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>     at
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>     at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>     at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>     at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>     at java.lang.Thread.run(Thread.java:748)
>>
>>
>> Neither seem related to the job code at all, but seem to be problems with
>> the metrics on the flink connector and something internal to flink. They
>> seem to happen once at startup, and don't repeat once the cluster reaches a
>> steady state.
>>
>> The jobs also appear to be running correctly in spite of these Exceptions
>> appearing in their "Exception" tab in the jobmanager.
>>
>> Is there something that we need to fix in our setup? Are there any
>> implications around missing metrics etc?
>>
>> Best regards,
>>
>> Mark Harris
>>
>> hivehome.com <http://www.hivehome.com>
>>
>>
>>
>> Hive | London | Cambridge | Houston | Toronto
>> The information contained in or attached to this email is confidential
>> and intended only for the use of the individual(s) to which it is
>> addressed. It may contain information which is confidential and/or covered
>> by legal professional or other privilege. The views expressed in this email
>> are not necessarily the views of Centrica plc, and the company, its
>> directors, officers or employees make no representation or accept any
>> liability for their accuracy or completeness unless expressly stated to the
>> contrary.
>> Centrica Hive Limited (company no: 5782908), registered in England and
>> Wales with its registered office at Millstream, Maidenhead Road, Windsor,
>> Berkshire SL4 5GD.
>>
>

-- 
hivehome.com <http://www.hivehome.com>






Hive | London | Cambridge | 
Houston | Toronto****
The information contained in or attached to this 
email is confidential and intended only for the use of the individual(s) to 
which it is addressed. It may contain information which is confidential 
and/or covered by legal professional or other privilege. The views 
expressed in this email are not necessarily the views of Centrica plc, and 
the company, its directors, officers or employees make no representation or 
accept any liability for their accuracy or completeness unless expressly 
stated to the contrary. ****
Centrica Hive Limited (company no: 5782908), 
registered in England and Wales with its registered office at Millstream, 
Maidenhead Road, Windsor, Berkshire SL4 5GD.



Re: KafkaException or ExecutionStateChange failure on job startup

Posted by Dominik Wosiński <wo...@gmail.com>.
Hey Mark,

Do You use more than 1 Kafka consumer for Your jobs? I think this relates
to the known issue in Kafka:
https://issues.apache.org/jira/browse/KAFKA-3992.
The problem is that if You don't provide client ID for your
*KafkaConsumer* Kafka
assigns one, but this is done in an unsynchronized way, so finally, it ends
up in assigning the same id for multiple different Consumer instances.
Probably this is what happens when multiple jobs are resumed at the same
time.

What You could try to do is to assign the *consumer.id
<http://consumer.id>* using
properties passed to each consumer. This should help in solving this issue.

Best Regards,
Dom.




wt., 23 paź 2018 o 13:21 Mark Harris <ma...@hivehome.com> napisał(a):

> Hi,
> We regularly see the following two exceptions in a number of jobs shortly
> after they have been resumed during our flink cluster startup:
>
> org.apache.kafka.common.KafkaException: Error registering mbean
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>     at
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
>     at
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
>     at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436)
>     at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
>     at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
>     at
> org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:749)
>     at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:327)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:188)
>     at
> org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:283)
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1344)
>     at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77)
>     at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
>     at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:473)
>     at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>     at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>     at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>     at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>     at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>     at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>     at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>     at
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
>     ... 21 more
> java.lang.Exception: Failed to send ExecutionStateChange notification to
> JobManager
>     at
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:439)
>     at
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:423)
>     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>     at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>     at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>     at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>     at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>     at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>     at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@ip-10-150-24-22.eu-west-1.compute.internal:41775/user/jobmanager#163569829]]
> after [30000 ms]. Sender[null] sent message of type
> "org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage".
>     at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>     at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>     at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>     at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>     at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>     at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>     at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>     at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>     at java.lang.Thread.run(Thread.java:748)
>
>
> Neither seem related to the job code at all, but seem to be problems with
> the metrics on the flink connector and something internal to flink. They
> seem to happen once at startup, and don't repeat once the cluster reaches a
> steady state.
>
> The jobs also appear to be running correctly in spite of these Exceptions
> appearing in their "Exception" tab in the jobmanager.
>
> Is there something that we need to fix in our setup? Are there any
> implications around missing metrics etc?
>
> Best regards,
>
> Mark Harris
>
> hivehome.com <http://www.hivehome.com>
>
>
>
> Hive | London | Cambridge | Houston | Toronto
> The information contained in or attached to this email is confidential and
> intended only for the use of the individual(s) to which it is addressed. It
> may contain information which is confidential and/or covered by legal
> professional or other privilege. The views expressed in this email are not
> necessarily the views of Centrica plc, and the company, its directors,
> officers or employees make no representation or accept any liability for
> their accuracy or completeness unless expressly stated to the contrary.
> Centrica Hive Limited (company no: 5782908), registered in England and
> Wales with its registered office at Millstream, Maidenhead Road, Windsor,
> Berkshire SL4 5GD.
>