You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jose Miguel Tejedor Fernandez <jo...@rovio.com> on 2018/01/09 15:38:00 UTC

Stream job failed after increasing number retained checkpoints

Hello,

I have several stream jobs running (v. 1.3.1 ) in production which always
fails after a fixed period of around 30h after being executing. That's the
WARN trace before failing:

Association with remote system
[akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876] has
failed, address is now gated for [5000] ms. Reason: [Association
failed with [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876]]
Caused by: [No response from remote for outbound association.
Handshake timed out after [20000 ms].


The main change done in the job configuration was to increase the
state.checkpoints.num-retained from 1 to *2880*. I am using asynchronous
RocksDB to persists to snapshot the state. (I attach some screenshots with
the  checkpoint conf from webUI)


   - May my assumption be correct that the increase of
   checkpoints.num-retained is causing the problem? Any known issue regarding
   this?


   - Besides, Is there any way to increase the Akka handshake timeout from
   the current 20000 ms to a higher value? I considered that it may be
   convenient to increase the timeout to 1 minute instead.


BR

Re: Stream job failed after increasing number retained checkpoints

Posted by Jose Miguel Tejedor Fernandez <jo...@rovio.com>.
Thanks Piotr and Stefan,

The problem was the overhead in the heap memory usage of the JobManager
when increasing the num-retained checkpoints. It was solved once I revert
that value to one.

BR

That's the actual error according to the JobManager log in the OOM:

2018-01-08 22:27:09,293 WARN
org.jboss.netty.channel.socket.nio.AbstractNioSelector
      - Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
2018-01-08 22:27:15,796 ERROR akka.actor.ActorSystemImpl
                - Uncaught error from thread
[flink-akka.actor.default-dispatcher-22840]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.OutOfMemoryError: Java heap space
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
growArray(ForkJoinPool.java:1090)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
ForkJoinPool.java:1978)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)
2018-01-08 22:27:16,288 ERROR akka.actor.ActorSystemImpl
                - Uncaught error from thread
[flink-akka.remote.default-remote-dispatcher-22839]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.OutOfMemoryError: Java heap space
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
growArray(ForkJoinPool.java:1090)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
ForkJoinPool.java:1978)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)
2018-01-08 22:27:16,882 INFO
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
       - Removing web dashboard root cache directory
/tmp/flink-web-f75e187d-3d08-4864-ba08-1740c8586be1
2018-01-08 22:27:17,394 INFO
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
       - Removing web dashboard jar upload directory
/tmp/flink-web-2c8657f2-9b87-4964-bde4-9997ef31966d
2018-01-08 22:27:19,863 INFO  org.apache.flink.runtime.blob.BlobServer
                - Stopped BLOB server at 0.0.0.0:44378



here’s an test with Abba after it has accumulated state for 21 hours
it seems that creating a nightly savepoint won’t necessarily be scalable
so being able to use incremental checkpoints would still seem appealing, if
possible
or why not making a savepoint somehow by copying the required files of a
checkpoint instead - but I doubt that flink would support that



*José Miguel Tejedor Fernández*
Server developer
jose.fernandez@rovio.com

Rovio Entertainment Ltd.
Keilaranta 7, FIN - 02150 Espoo, Finland
www.rovio.com



On Wed, Jan 10, 2018 at 3:08 PM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> This Task Manager log is suggesting that problems lays on the Job Manager
> side (no visible gap in the logs, GC Time reported is accumulated and 31
> seconds accumulated over 963 gc collections is low value). Could you show
> the Job Manager log itself? Probably it’s the own that’s causing the
> TaskManager to timeout.
>
> On the other hand, I see that Task Manager max heap size is ~5GB and I
> assume this is the same setting for the Job manager. A Stefan pointed out,
> there is some memory overhead on the Job Manager for retaining the
> checkpoint and it is around couple of hundred bytes (maybe even 1KB) per
> operator instance. By doing quick math:
>
> 2880 checkpoints * 10 task managers * 10 operators in the job * 8
> parallelism per task manager * 500 bytes = ~1GB
>
> The answer might be that you just need to increase the Job Manager max
> heap to retain 2880 checkpoints.
>
> Piotrek
>
> On 10 Jan 2018, at 12:00, Jose Miguel Tejedor Fernandez <
> jose.fernandez@rovio.com> wrote:
>
> Hi,
>
> I wonder what reason you might have that you ever want such a huge number
>> of retained checkpoints?
>
>
> The Flink jobs running on EMR cluster require a checkpoint at midnight.
> (In our use case we need to synch a loaded delta to our a third party
> partner with the streamed data). The delta load the whole day data and
> that's why we wanted to have available the midnight's checkpoint to start
> from there.
> We could also make a savepoint at midnight, but it’s not as handy (we
> would need to build our own tooling to do it), and it can’t benefit from
> the smaller latency of an incremental checkpoint. Another thining is that
> implementing our own savepoint tool is a bit hard to monitor. Besides,
> retaining several having checkpoints created every minute is that it would
> also allow us to load a delta at any time. Please, if there are better ways
> of achieving this, let me know.
>
> From where does the log trace come from?
>
>
> It comes from the TaskManager.
>
> Please search on the opposite side of the time outing connection for
>> possible root cause of the timeout including:
>> - possible error/exceptions/warnings
>> - long GC pauses or other blocking operations (possibly long unnatural
>> gaps in the logs)
>> - machine health (CPU usage, disks usage, network connections)
>
>
> It seems that TaskManager disconnect from JobManager and then cannot reach
> it again and I cannot tell the reason. I think machine health metrics
> mentioned above seems to be OK. Would you say *Direct memory stats *usage
> is correct? What is the way to check the GC pauses?
> Those are some traces from the TaskManager log, before/after it detached
> from JobManager
>
> 2018-01-08 22:26:37,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476,
> GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Memory usage stats: [HEAP: 868/5597/5597 MB, NON HEAP:
> 116/119/-1 MB (used/committed/max)]
> 2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Direct memory stats: Count: 100, Total Capacity: 29942814,
> Used Memory: 29942815
> 2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Off-heap pool stats: [Code Cache: 42/42/240 MB
> (used/committed/max)], [Metaspace: 66/68/-1 MB (used/committed/max)],
> [Compressed Class Space: 8/8/1024 MB (used/committed/max)]
> 2018-01-08 22:26:42,264 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>             - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476,
> GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher
>                  - Detected unreachable: [akka.tcp://flink@ip-10-1-51-
> 209.cloud-internal.rovio.com:35341]
> 2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager
>                    - TaskManager akka://flink/user/taskmanager
> disconnects from JobManager akka.tcp://flink@ip-10-1-51-
> 209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no
> longer reachable
> 2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager
>                    - Cancelling all computations and discarding all cached
> data.
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - Attempting to fail task externally Sink: Discarded
> events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - Sink: Discarded events (4/4) (
> 50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager
> disconnects from JobManager akka.tcp://flink@ip-10-1-51-
> 209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no
> longer reachable
> at org.apache.flink.runtime.taskmanager.TaskManager.
> handleJobManagerDisconnect(TaskManager.scala:1095)
> at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at org.apache.flink.runtime.taskmanager.TaskManager.
> aroundReceive(TaskManager.scala:120)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.dungeon.DeathWatch$class.receivedTerminated(
> DeathWatch.scala:44)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> 2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - Triggering cancellation of task code Sink: Discarded
> events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - Attempting to fail task externally Sink: CounterSink
> (async call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).
>
>
> *José Miguel Tejedor Fernández*
> Server developer
> jose.fernandez@rovio.com
>
> Rovio Entertainment Ltd.
> Keilaranta 7
> <https://maps.google.com/?q=Keilaranta+7&entry=gmail&source=g>, FIN -
> 02150 Espoo, Finland
> www.rovio.com
>
>
>
> On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Hi,
>>
>> there is no known limitation in the strict sense, but you might run out
>> of dfs space or job manager memory if you keep around a huge number
>> checkpoints. I wonder what reason you might have that you ever want such a
>> huge number of retained checkpoints? Usually keeping one checkpoint should
>> do the job, maybe a couple more if you are very afraid about corruption
>> that goes beyond your DFSs capabilities to handle it. Is there any reason
>> for that or maybe a misconception about increasing the number of retained
>> checkpoints is good for?
>>
>> Best,
>> Stefan
>>
>>
>> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <pi...@data-artisans.com>:
>>
>> Hi,
>>
>> Increasing akka’s timeouts is rarely a solution for any problems - it
>> either do not help, or just mask the issue making it less visible. But yes,
>> it is possible to bump the limits: https://ci.apache.org/
>> projects/flink/flink-docs-release-1.3/setup/config.html#dist
>> ributed-coordination-via-akka
>>
>> I don’t think that state.checkpoints.num-retained was thought to handle
>> such large numbers of retained checkpoint so maybe there are some
>> known/unknown limitations. Stefan, do you know something in this regard?
>>
>> Parallel thing to do is that like for any other akka timeout, you should
>> track down the root cause of it. This one warning line doesn’t tell much.
>> From where does it come from? Client log? Job manager log? Task manager
>> log? Please search on the opposite side of the time outing connection for
>> possible root cause of the timeout including:
>> - possible error/exceptions/warnings
>> - long GC pauses or other blocking operations (possibly long unnatural
>> gaps in the logs)
>> - machine health (CPU usage, disks usage, network connections)
>>
>> Piotrek
>>
>> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <
>> jose.fernandez@rovio.com> wrote:
>>
>> Hello,
>>
>> I have several stream jobs running (v. 1.3.1 ) in production which always
>> fails after a fixed period of around 30h after being executing. That's the
>> WARN trace before failing:
>>
>> Association with remote system [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].
>>
>>
>> The main change done in the job configuration was to increase the
>> state.checkpoints.num-retained from 1 to *2880*. I am using asynchronous
>> RocksDB to persists to snapshot the state. (I attach some screenshots with
>> the  checkpoint conf from webUI)
>>
>>
>>    - May my assumption be correct that the increase of
>>    checkpoints.num-retained is causing the problem? Any known issue regarding
>>    this?
>>
>>
>>    - Besides, Is there any way to increase the Akka handshake timeout
>>    from the current 20000 ms to a higher value? I considered that it may be
>>    convenient to increase the timeout to 1 minute instead.
>>
>>
>> BR
>>
>>
>> <Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at
>> 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>
>>
>>
>>
>>
>
>

Re: Stream job failed after increasing number retained checkpoints

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

This Task Manager log is suggesting that problems lays on the Job Manager side (no visible gap in the logs, GC Time reported is accumulated and 31 seconds accumulated over 963 gc collections is low value). Could you show the Job Manager log itself? Probably it’s the own that’s causing the TaskManager to timeout.

On the other hand, I see that Task Manager max heap size is ~5GB and I assume this is the same setting for the Job manager. A Stefan pointed out, there is some memory overhead on the Job Manager for retaining the checkpoint and it is around couple of hundred bytes (maybe even 1KB) per operator instance. By doing quick math:

2880 checkpoints * 10 task managers * 10 operators in the job * 8 parallelism per task manager * 500 bytes = ~1GB

The answer might be that you just need to increase the Job Manager max heap to retain 2880 checkpoints.

Piotrek

> On 10 Jan 2018, at 12:00, Jose Miguel Tejedor Fernandez <jo...@rovio.com> wrote:
> 
> Hi,
> 
> I wonder what reason you might have that you ever want such a huge number of retained checkpoints? 
> 
> The Flink jobs running on EMR cluster require a checkpoint at midnight. (In our use case we need to synch a loaded delta to our a third party partner with the streamed data). The delta load the whole day data and that's why we wanted to have available the midnight's checkpoint to start from there.
> We could also make a savepoint at midnight, but it’s not as handy (we would need to build our own tooling to do it), and it can’t benefit from the smaller latency of an incremental checkpoint. Another thining is that implementing our own savepoint tool is a bit hard to monitor. Besides, retaining several having checkpoints created every minute is that it would also allow us to load a delta at any time. Please, if there are better ways of achieving this, let me know.
> 
> From where does the log trace come from?  
> 
> It comes from the TaskManager.  
> 
> Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
> - possible error/exceptions/warnings
> - long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
> - machine health (CPU usage, disks usage, network connections)
> 
> It seems that TaskManager disconnect from JobManager and then cannot reach it again and I cannot tell the reason. I think machine health metrics mentioned above seems to be OK. Would you say Direct memory stats usage is correct? What is the way to check the GC pauses?
> Those are some traces from the TaskManager log, before/after it detached from JobManager
> 
> 2018-01-08 22:26:37,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 868/5597/5597 MB, NON HEAP: 116/119/-1 MB (used/committed/max)]
> 2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory stats: Count: 100, Total Capacity: 29942814, Used Memory: 29942815
> 2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool stats: [Code Cache: 42/42/240 MB (used/committed/max)], [Metaspace: 66/68/-1 MB (used/committed/max)], [Compressed Class Space: 8/8/1024 MB (used/committed/max)]
> 2018-01-08 22:26:42,264 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@ip-10-1-51-209.cloud-internal.rovio.com:35341 <http://flink@ip-10-1-51-209.cloud-internal.rovio.com:35341/>]
> 2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager <http://flink@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager>: JobManager is no longer reachable
> 2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager                         - Cancelling all computations and discarding all cached data.
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager <http://flink@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager>: JobManager is no longer reachable
> 	at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
> 	at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> 	at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
> 	at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> 	at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Sink: CounterSink (async call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).
> 
> 
> José Miguel Tejedor Fernández
> Server developer
> jose.fernandez@rovio.com <ma...@rovio.com>
> Rovio Entertainment Ltd.
> Keilaranta 7, FIN - 02150 Espoo, Finland
> www.rovio.com <http://www.rovio.com/>
> 
> 
> On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> there is no known limitation in the strict sense, but you might run out of dfs space or job manager memory if you keep around a huge number checkpoints. I wonder what reason you might have that you ever want such a huge number of retained checkpoints? Usually keeping one checkpoint should do the job, maybe a couple more if you are very afraid about corruption that goes beyond your DFSs capabilities to handle it. Is there any reason for that or maybe a misconception about increasing the number of retained checkpoints is good for?
> 
> Best,
> Stefan 
> 
> 
>> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>:
>> 
>> Hi,
>> 
>> Increasing akka’s timeouts is rarely a solution for any problems - it either do not help, or just mask the issue making it less visible. But yes, it is possible to bump the limits: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka>
>> 
>> I don’t think that state.checkpoints.num-retained was thought to handle such large numbers of retained checkpoint so maybe there are some known/unknown limitations. Stefan, do you know something in this regard?
>> 
>> Parallel thing to do is that like for any other akka timeout, you should track down the root cause of it. This one warning line doesn’t tell much. From where does it come from? Client log? Job manager log? Task manager log? Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
>> - possible error/exceptions/warnings
>> - long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
>> - machine health (CPU usage, disks usage, network connections)
>> 
>> Piotrek
>> 
>>> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <jose.fernandez@rovio.com <ma...@rovio.com>> wrote:
>>> 
>>> Hello,
>>> 
>>> I have several stream jobs running (v. 1.3.1 ) in production which always fails after a fixed period of around 30h after being executing. That's the WARN trace before failing:
>>> 
>>> Association with remote system [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876 <http://flink@ip-10-1-51-134.cloud-internal.acme.com:39876/>] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876 <http://flink@ip-10-1-51-134.cloud-internal.acme.com:39876/>]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].
>>> 
>>> The main change done in the job configuration was to increase the state.checkpoints.num-retained from 1 to 2880. I am using asynchronous RocksDB to persists to snapshot the state. (I attach some screenshots with the  checkpoint conf from webUI)
>>> 
>>> May my assumption be correct that the increase of checkpoints.num-retained is causing the problem? Any known issue regarding this?
>>> Besides, Is there any way to increase the Akka handshake timeout from the current 20000 ms to a higher value? I considered that it may be convenient to increase the timeout to 1 minute instead.
>>> 
>>> BR
>>> 
>>> 
>>> <Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>
>> 
> 
> 


Re: Stream job failed after increasing number retained checkpoints

Posted by Jose Miguel Tejedor Fernandez <jo...@rovio.com>.
Hi,

I wonder what reason you might have that you ever want such a huge number
> of retained checkpoints?


The Flink jobs running on EMR cluster require a checkpoint at midnight. (In
our use case we need to synch a loaded delta to our a third party
partner with the streamed data). The delta load the whole day data and
that's why we wanted to have available the midnight's checkpoint to start
from there.
We could also make a savepoint at midnight, but it’s not as handy (we would
need to build our own tooling to do it), and it can’t benefit from the
smaller latency of an incremental checkpoint. Another thining is that
implementing our own savepoint tool is a bit hard to monitor. Besides,
retaining several having checkpoints created every minute is that it would
also allow us to load a delta at any time. Please, if there are better ways
of achieving this, let me know.

From where does the log trace come from?


It comes from the TaskManager.

Please search on the opposite side of the time outing connection for
> possible root cause of the timeout including:
> - possible error/exceptions/warnings
> - long GC pauses or other blocking operations (possibly long unnatural
> gaps in the logs)
> - machine health (CPU usage, disks usage, network connections)


It seems that TaskManager disconnect from JobManager and then cannot reach
it again and I cannot tell the reason. I think machine health metrics
mentioned above seems to be OK. Would you say *Direct memory stats *usage
is correct? What is the way to check the GC pauses?
Those are some traces from the TaskManager log, before/after it detached
from JobManager

2018-01-08 22:26:37,263 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS
MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,263 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory
usage stats: [HEAP: 868/5597/5597 MB, NON HEAP: 116/119/-1 MB
(used/committed/max)]
2018-01-08 22:26:42,263 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 100, Total Capacity: 29942814, Used Memory: 29942815
2018-01-08 22:26:42,263 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 42/42/240 MB (used/committed/max)], [Metaspace:
66/68/-1 MB (used/committed/max)], [Compressed Class Space: 8/8/1024 MB
(used/committed/max)]
2018-01-08 22:26:42,264 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS
MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher
                 - Detected unreachable: [akka.tcp://
flink@ip-10-1-51-209.cloud-internal.rovio.com:35341]
2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager
                 - TaskManager akka://flink/user/taskmanager disconnects
from JobManager akka.tcp://
flink@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager:
JobManager is no longer reachable
2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager
                 - Cancelling all computations and discarding all cached
data.
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task
                 - Attempting to fail task externally Sink: Discarded
events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task
                 - Sink: Discarded events (4/4)
(50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects
from JobManager akka.tcp://
flink@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager:
JobManager is no longer reachable
at
org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task
                 - Triggering cancellation of task code Sink: Discarded
events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task
                 - Attempting to fail task externally Sink: CounterSink
(async call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).


*José Miguel Tejedor Fernández*
Server developer
jose.fernandez@rovio.com

Rovio Entertainment Ltd.
Keilaranta 7, FIN - 02150 Espoo, Finland
www.rovio.com



On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <
s.richter@data-artisans.com> wrote:

> Hi,
>
> there is no known limitation in the strict sense, but you might run out of
> dfs space or job manager memory if you keep around a huge number
> checkpoints. I wonder what reason you might have that you ever want such a
> huge number of retained checkpoints? Usually keeping one checkpoint should
> do the job, maybe a couple more if you are very afraid about corruption
> that goes beyond your DFSs capabilities to handle it. Is there any reason
> for that or maybe a misconception about increasing the number of retained
> checkpoints is good for?
>
> Best,
> Stefan
>
>
> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <pi...@data-artisans.com>:
>
> Hi,
>
> Increasing akka’s timeouts is rarely a solution for any problems - it
> either do not help, or just mask the issue making it less visible. But yes,
> it is possible to bump the limits: https://ci.apache.org/
> projects/flink/flink-docs-release-1.3/setup/config.html#
> distributed-coordination-via-akka
>
> I don’t think that state.checkpoints.num-retained was thought to handle
> such large numbers of retained checkpoint so maybe there are some
> known/unknown limitations. Stefan, do you know something in this regard?
>
> Parallel thing to do is that like for any other akka timeout, you should
> track down the root cause of it. This one warning line doesn’t tell much.
> From where does it come from? Client log? Job manager log? Task manager
> log? Please search on the opposite side of the time outing connection for
> possible root cause of the timeout including:
> - possible error/exceptions/warnings
> - long GC pauses or other blocking operations (possibly long unnatural
> gaps in the logs)
> - machine health (CPU usage, disks usage, network connections)
>
> Piotrek
>
> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <
> jose.fernandez@rovio.com> wrote:
>
> Hello,
>
> I have several stream jobs running (v. 1.3.1 ) in production which always
> fails after a fixed period of around 30h after being executing. That's the
> WARN trace before failing:
>
> Association with remote system [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].
>
>
> The main change done in the job configuration was to increase the
> state.checkpoints.num-retained from 1 to *2880*. I am using asynchronous
> RocksDB to persists to snapshot the state. (I attach some screenshots with
> the  checkpoint conf from webUI)
>
>
>    - May my assumption be correct that the increase of
>    checkpoints.num-retained is causing the problem? Any known issue regarding
>    this?
>
>
>    - Besides, Is there any way to increase the Akka handshake timeout
>    from the current 20000 ms to a higher value? I considered that it may be
>    convenient to increase the timeout to 1 minute instead.
>
>
> BR
>
>
> <Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at
> 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>
>
>
>
>

Re: Stream job failed after increasing number retained checkpoints

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

there is no known limitation in the strict sense, but you might run out of dfs space or job manager memory if you keep around a huge number checkpoints. I wonder what reason you might have that you ever want such a huge number of retained checkpoints? Usually keeping one checkpoint should do the job, maybe a couple more if you are very afraid about corruption that goes beyond your DFSs capabilities to handle it. Is there any reason for that or maybe a misconception about increasing the number of retained checkpoints is good for?

Best,
Stefan 

> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <pi...@data-artisans.com>:
> 
> Hi,
> 
> Increasing akka’s timeouts is rarely a solution for any problems - it either do not help, or just mask the issue making it less visible. But yes, it is possible to bump the limits: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka>
> 
> I don’t think that state.checkpoints.num-retained was thought to handle such large numbers of retained checkpoint so maybe there are some known/unknown limitations. Stefan, do you know something in this regard?
> 
> Parallel thing to do is that like for any other akka timeout, you should track down the root cause of it. This one warning line doesn’t tell much. From where does it come from? Client log? Job manager log? Task manager log? Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
> - possible error/exceptions/warnings
> - long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
> - machine health (CPU usage, disks usage, network connections)
> 
> Piotrek
> 
>> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <jose.fernandez@rovio.com <ma...@rovio.com>> wrote:
>> 
>> Hello,
>> 
>> I have several stream jobs running (v. 1.3.1 ) in production which always fails after a fixed period of around 30h after being executing. That's the WARN trace before failing:
>> 
>> Association with remote system [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876 <http://flink@ip-10-1-51-134.cloud-internal.acme.com:39876/>] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876 <http://flink@ip-10-1-51-134.cloud-internal.acme.com:39876/>]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].
>> 
>> The main change done in the job configuration was to increase the state.checkpoints.num-retained from 1 to 2880. I am using asynchronous RocksDB to persists to snapshot the state. (I attach some screenshots with the  checkpoint conf from webUI)
>> 
>> May my assumption be correct that the increase of checkpoints.num-retained is causing the problem? Any known issue regarding this?
>> Besides, Is there any way to increase the Akka handshake timeout from the current 20000 ms to a higher value? I considered that it may be convenient to increase the timeout to 1 minute instead.
>> 
>> BR
>> 
>> 
>> <Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>
> 


Re: Stream job failed after increasing number retained checkpoints

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Increasing akka’s timeouts is rarely a solution for any problems - it either do not help, or just mask the issue making it less visible. But yes, it is possible to bump the limits: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka>

I don’t think that state.checkpoints.num-retained was thought to handle such large numbers of retained checkpoint so maybe there are some known/unknown limitations. Stefan, do you know something in this regard?

Parallel thing to do is that like for any other akka timeout, you should track down the root cause of it. This one warning line doesn’t tell much. From where does it come from? Client log? Job manager log? Task manager log? Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
- machine health (CPU usage, disks usage, network connections)

Piotrek

> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <jo...@rovio.com> wrote:
> 
> Hello,
> 
> I have several stream jobs running (v. 1.3.1 ) in production which always fails after a fixed period of around 30h after being executing. That's the WARN trace before failing:
> 
> Association with remote system [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876 <http://flink@ip-10-1-51-134.cloud-internal.acme.com:39876/>] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876 <http://flink@ip-10-1-51-134.cloud-internal.acme.com:39876/>]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].
> 
> The main change done in the job configuration was to increase the state.checkpoints.num-retained from 1 to 2880. I am using asynchronous RocksDB to persists to snapshot the state. (I attach some screenshots with the  checkpoint conf from webUI)
> 
> May my assumption be correct that the increase of checkpoints.num-retained is causing the problem? Any known issue regarding this?
> Besides, Is there any way to increase the Akka handshake timeout from the current 20000 ms to a higher value? I considered that it may be convenient to increase the timeout to 1 minute instead.
> 
> BR
> 
> 
> <Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>