You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2020/06/29 09:38:43 UTC

Timeout when using RockDB to handle large state in a stream app

Hi community,

I am trying to run a stream application with large state in a
standalone flink cluster [3]. I configured the RocksDB state backend
and I increased the memory of the Job Manager and Task Manager.
However, I am still getting the timeout message
"java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
1.10.1 and here are the configurations that I changed on the
flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
filesystem. I am not sure if I need to use HDFS here since I am
testing only in one machine.

jobmanager.heap.size: 12g
taskmanager.memory.process.size: 8g
state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink/state

In the stream application I am using RocksDB as well (full code [3]):
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true));

I have some operators that hold a large state when the load a static
table on their state. I use them in two aggregate operations [1] and
[2].

[1] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
[2] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
[3] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java

Here is my stack trace error:

org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of
TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
... 26 more

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

Re: Timeout when using RockDB to handle large state in a stream app

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Felipe

I'm just wandering does increase the heartbeat.timeout with
RocksDBStateBackend works for you. If not, what does the GC log say? thanks.

Best,
Congxian


Felipe Gutierrez <fe...@gmail.com> 于2020年7月7日周二 下午10:02写道:

> I figured out that for my stream job the best was just to use the
> default MemoryStateBackend. I load a table from a file of 725MB in a
> UDF. I am also not using Flink ListState since I don't have to change
> the values of this table. i only do a lookup.
>
> The only thing that I need was more memory for the TM and a bit larger
> timeout. Currently, my configurations are these. I am not sure if
> there are a better configuration
> heartbeat.timeout: 100000
> taskmanager.memory.flink.size: 12g
> taskmanager.memory.jvm-overhead.max: 4g
> taskmanager.memory.jvm-metaspace.size: 2048m # default: 1024m
>
> Another thing that is not working is this parameter that when I set it
> I get an JVM argument error and the TM does not start.
>
> taskmanager.memory.task.heap.size: 2048m # default: 1024m # Flink error
>
> Best,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Tue, Jul 7, 2020 at 2:17 PM Yun Tang <my...@live.com> wrote:
> >
> > Hi Felipe
> >
> > flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed cannot tell you
> how much memory is used by RocksDB as it mallocate memory from os directly
> instead from JVM.
> >
> > Moreover, I cannot totally understand why you ask how to increase the
> memory of the JM and TM when using the
> PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB.
> > Did you mean how to increase the total process memory? If so, as Flink
> uses managed memory to control RocksDB [1] by default, you could increase
> total memory by increasing managed memory [2][3]
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-rocksdb-memory-managed
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-fraction
> > [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-size
> >
> > Best,
> > Yun Tang
> >
> >
> >
> > ________________________________
> > From: Felipe Gutierrez <fe...@gmail.com>
> > Sent: Monday, July 6, 2020 19:17
> > To: Yun Tang <my...@live.com>
> > Cc: Ori Popowski <or...@gmail.com>; user <us...@flink.apache.org>
> > Subject: Re: Timeout when using RockDB to handle large state in a stream
> app
> >
> > Hi all,
> >
> > I tested the two TPC-H query 03 [1] and 10 [2] using Datastream API on
> > the cluster with RocksDB state backend. One thing that I did that
> > improved a lot was to replace the List<LineItem> POJO to a
> > List<Tuple2<>>. Then I could load a table of 200MB in memory as my
> > state. However, the original table is 725MB, and turned out that I
> > need another configuration. I am not sure what I can do more to reduce
> > the size of my state. If one of you have an idea I am thankful to
> > hear.
> >
> > Now, speaking about the flink-conf.yaml file and the RocksDB
> > configuration. When I use these configurations on the flink-conf.yaml
> > the stream job still runs out of memory.
> > jobmanager.heap.size: 4g # default: 2048m
> > heartbeat.timeout: 100000
> > taskmanager.memory.process.size: 2g # default: 1728m
> >
> > Then I changed for this configuration which I can set
> > programmatically. The stream job seems to behave better. It starts to
> > process something, then the metrics disappear for some time and appear
> > again. The available and used memory on the TM
> > (flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed) is 167MB. And
> > the available and used memory on the JM
> > (flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed) is 610KB. I
> > guess the PredefinedOptions.SPINNING_DISK_OPTIMIZED configuration is
> > overwriting the configuration on the flink-conf.yaml file.
> >
> > RocksDBStateBackend stateBackend = new RocksDBStateBackend(stateDir,
> true);
> >
> stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> > env.setStateBackend(stateBackend);
> >
> > How can I increase the memory of the JM and TM when I am still using
> > the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB?
> >
> > [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
> > [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery10.java
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Fri, Jul 3, 2020 at 9:01 AM Felipe Gutierrez
> > <fe...@gmail.com> wrote:
> > >
> > > yes. I agree. because RocsDB will spill data to disk if there is not
> > > enough space in memory.
> > > Thanks
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com
> > >
> > > On Fri, Jul 3, 2020 at 8:27 AM Yun Tang <my...@live.com> wrote:
> > > >
> > > > Hi Felipe,
> > > >
> > > > I noticed my previous mail has a typo: RocksDB is executed in task
> main thread which does not take the role to respond to heart beat. Sorry
> for previous typo, and the key point I want to clarify is that RocksDB
> should not have business for heartbeat problem.
> > > >
> > > > Best
> > > > Yun Tang
> > > > ________________________________
> > > > From: Felipe Gutierrez <fe...@gmail.com>
> > > > Sent: Tuesday, June 30, 2020 17:46
> > > > To: Yun Tang <my...@live.com>
> > > > Cc: Ori Popowski <or...@gmail.com>; user <us...@flink.apache.org>
> > > > Subject: Re: Timeout when using RockDB to handle large state in a
> stream app
> > > >
> > > > Hi,
> > > >
> > > > I reduced the size of the tables that I am loading on a ListState and
> > > > the query worked. One of them was about 700MB [1] [2].
> > > >
> > > > Now I am gonna deploy it on the cluster and check if it works. I will
> > > > probably need to increase the heartbeat timeout.
> > > >
> > > > Thanks,
> > > > Felipe
> > > > [1]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
> > > > [2]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
> > > > --
> > > > -- Felipe Gutierrez
> > > > -- skype: felipe.o.gutierrez
> > > > -- https://felipeogutierrez.blogspot.com
> > > >
> > > > On Tue, Jun 30, 2020 at 10:51 AM Yun Tang <my...@live.com> wrote:
> > > > >
> > > > > Hi Felipe
> > > > >
> > > > > RocksDB is executed in task main thread which does take the role
> to respond to heart beat and RocksDB mainly use native memory which is
> decoupled from JVM heap to not bring any GC pressure. Thus, timeout should
> have no relationship with RocksDB in general if your task manager is really
> heartbeat timeout instead of crash to exit.
> > > > >
> > > > > Try to increase the heartbeat timeout [1] and watch the GC detail
> logs to see anything weird.
> > > > >
> > > > > [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout
> > > > >
> > > > > Best
> > > > > Yun Tang
> > > > >
> > > > > ________________________________
> > > > > From: Ori Popowski <or...@gmail.com>
> > > > > Sent: Monday, June 29, 2020 17:44
> > > > > Cc: user <us...@flink.apache.org>
> > > > > Subject: Re: Timeout when using RockDB to handle large state in a
> stream app
> > > > >
> > > > > Hi there,
> > > > >
> > > > > I'm currently experiencing the exact same issue.
> > > > >
> > > > >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
> > > > >
> > > > > I've found out that GC is causing the problem, but I still haven't
> managed to solve this.
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
> > > > >
> > > > > Hi community,
> > > > >
> > > > > I am trying to run a stream application with large state in a
> > > > > standalone flink cluster [3]. I configured the RocksDB state
> backend
> > > > > and I increased the memory of the Job Manager and Task Manager.
> > > > > However, I am still getting the timeout message
> > > > > "java.util.concurrent.TimeoutException: Heartbeat of TaskManager
> with
> > > > > id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> > > > > 1.10.1 and here are the configurations that I changed on the
> > > > > flink-conf.yaml. For the "state.checkpoints.dir" I am still using
> the
> > > > > filesystem. I am not sure if I need to use HDFS here since I am
> > > > > testing only in one machine.
> > > > >
> > > > > jobmanager.heap.size: 12g
> > > > > taskmanager.memory.process.size: 8g
> > > > > state.backend: rocksdb
> > > > > state.checkpoints.dir: file:///tmp/flink/state
> > > > >
> > > > > In the stream application I am using RocksDB as well (full code
> [3]):
> > > > > StreamExecutionEnvironment env =
> > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > env.setStateBackend(new
> RocksDBStateBackend("file:///tmp/flink/state", true));
> > > > >
> > > > > I have some operators that hold a large state when the load a
> static
> > > > > table on their state. I use them in two aggregate operations [1]
> and
> > > > > [2].
> > > > >
> > > > > [1]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> > > > > [2]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> > > > > [3]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
> > > > >
> > > > > Here is my stack trace error:
> > > > >
> > > > > org.apache.flink.runtime.JobException: Recovery is suppressed by
> > > > > NoRestartBackoffTimeStrategy
> > > > > at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> > > > > at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> > > > > at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> > > > > at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> > > > > at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> > > > > at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> > > > > at
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> > > > > at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> > > > > at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> > > > > at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> > > > > at
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> > > > > at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> > > > > at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> > > > > at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> > > > > at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> > > > > at
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> > > > > at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> > > > > at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> > > > > at
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> > > > > at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> > > > > at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> > > > > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > > > at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> > > > > at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> > > > > at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > > > > at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> > > > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > > > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > > > > at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > > > at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > > > > at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > > > > at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > > > at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > > > at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> > > > > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > > > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > > > > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > > > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > > > > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > > > > at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > > at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > > > > at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > > at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > > Caused by: java.util.concurrent.TimeoutException: Heartbeat of
> > > > > TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
> > > > > ... 26 more
> > > > >
> > > > > Thanks,
> > > > > Felipe
> > > > > --
> > > > > -- Felipe Gutierrez
> > > > > -- skype: felipe.o.gutierrez
> > > > > -- https://felipeogutierrez.blogspot.com
>

Re: Timeout when using RockDB to handle large state in a stream app

Posted by Felipe Gutierrez <fe...@gmail.com>.
I figured out that for my stream job the best was just to use the
default MemoryStateBackend. I load a table from a file of 725MB in a
UDF. I am also not using Flink ListState since I don't have to change
the values of this table. i only do a lookup.

The only thing that I need was more memory for the TM and a bit larger
timeout. Currently, my configurations are these. I am not sure if
there are a better configuration
heartbeat.timeout: 100000
taskmanager.memory.flink.size: 12g
taskmanager.memory.jvm-overhead.max: 4g
taskmanager.memory.jvm-metaspace.size: 2048m # default: 1024m

Another thing that is not working is this parameter that when I set it
I get an JVM argument error and the TM does not start.

taskmanager.memory.task.heap.size: 2048m # default: 1024m # Flink error

Best,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Jul 7, 2020 at 2:17 PM Yun Tang <my...@live.com> wrote:
>
> Hi Felipe
>
> flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed cannot tell you how much memory is used by RocksDB as it mallocate memory from os directly instead from JVM.
>
> Moreover, I cannot totally understand why you ask how to increase the memory of the JM and TM when using the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB.
> Did you mean how to increase the total process memory? If so, as Flink uses managed memory to control RocksDB [1] by default, you could increase total memory by increasing managed memory [2][3]
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-rocksdb-memory-managed
> [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-fraction
> [3] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-size
>
> Best,
> Yun Tang
>
>
>
> ________________________________
> From: Felipe Gutierrez <fe...@gmail.com>
> Sent: Monday, July 6, 2020 19:17
> To: Yun Tang <my...@live.com>
> Cc: Ori Popowski <or...@gmail.com>; user <us...@flink.apache.org>
> Subject: Re: Timeout when using RockDB to handle large state in a stream app
>
> Hi all,
>
> I tested the two TPC-H query 03 [1] and 10 [2] using Datastream API on
> the cluster with RocksDB state backend. One thing that I did that
> improved a lot was to replace the List<LineItem> POJO to a
> List<Tuple2<>>. Then I could load a table of 200MB in memory as my
> state. However, the original table is 725MB, and turned out that I
> need another configuration. I am not sure what I can do more to reduce
> the size of my state. If one of you have an idea I am thankful to
> hear.
>
> Now, speaking about the flink-conf.yaml file and the RocksDB
> configuration. When I use these configurations on the flink-conf.yaml
> the stream job still runs out of memory.
> jobmanager.heap.size: 4g # default: 2048m
> heartbeat.timeout: 100000
> taskmanager.memory.process.size: 2g # default: 1728m
>
> Then I changed for this configuration which I can set
> programmatically. The stream job seems to behave better. It starts to
> process something, then the metrics disappear for some time and appear
> again. The available and used memory on the TM
> (flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed) is 167MB. And
> the available and used memory on the JM
> (flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed) is 610KB. I
> guess the PredefinedOptions.SPINNING_DISK_OPTIMIZED configuration is
> overwriting the configuration on the flink-conf.yaml file.
>
> RocksDBStateBackend stateBackend = new RocksDBStateBackend(stateDir, true);
> stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> env.setStateBackend(stateBackend);
>
> How can I increase the memory of the JM and TM when I am still using
> the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB?
>
> [1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
> [2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery10.java
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, Jul 3, 2020 at 9:01 AM Felipe Gutierrez
> <fe...@gmail.com> wrote:
> >
> > yes. I agree. because RocsDB will spill data to disk if there is not
> > enough space in memory.
> > Thanks
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Fri, Jul 3, 2020 at 8:27 AM Yun Tang <my...@live.com> wrote:
> > >
> > > Hi Felipe,
> > >
> > > I noticed my previous mail has a typo: RocksDB is executed in task main thread which does not take the role to respond to heart beat. Sorry for previous typo, and the key point I want to clarify is that RocksDB should not have business for heartbeat problem.
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Felipe Gutierrez <fe...@gmail.com>
> > > Sent: Tuesday, June 30, 2020 17:46
> > > To: Yun Tang <my...@live.com>
> > > Cc: Ori Popowski <or...@gmail.com>; user <us...@flink.apache.org>
> > > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> > >
> > > Hi,
> > >
> > > I reduced the size of the tables that I am loading on a ListState and
> > > the query worked. One of them was about 700MB [1] [2].
> > >
> > > Now I am gonna deploy it on the cluster and check if it works. I will
> > > probably need to increase the heartbeat timeout.
> > >
> > > Thanks,
> > > Felipe
> > > [1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
> > > [2] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com
> > >
> > > On Tue, Jun 30, 2020 at 10:51 AM Yun Tang <my...@live.com> wrote:
> > > >
> > > > Hi Felipe
> > > >
> > > > RocksDB is executed in task main thread which does take the role to respond to heart beat and RocksDB mainly use native memory which is decoupled from JVM heap to not bring any GC pressure. Thus, timeout should have no relationship with RocksDB in general if your task manager is really heartbeat timeout instead of crash to exit.
> > > >
> > > > Try to increase the heartbeat timeout [1] and watch the GC detail logs to see anything weird.
> > > >
> > > > [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout
> > > >
> > > > Best
> > > > Yun Tang
> > > >
> > > > ________________________________
> > > > From: Ori Popowski <or...@gmail.com>
> > > > Sent: Monday, June 29, 2020 17:44
> > > > Cc: user <us...@flink.apache.org>
> > > > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> > > >
> > > > Hi there,
> > > >
> > > > I'm currently experiencing the exact same issue.
> > > >
> > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
> > > >
> > > > I've found out that GC is causing the problem, but I still haven't managed to solve this.
> > > >
> > > >
> > > >
> > > > On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <fe...@gmail.com> wrote:
> > > >
> > > > Hi community,
> > > >
> > > > I am trying to run a stream application with large state in a
> > > > standalone flink cluster [3]. I configured the RocksDB state backend
> > > > and I increased the memory of the Job Manager and Task Manager.
> > > > However, I am still getting the timeout message
> > > > "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
> > > > id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> > > > 1.10.1 and here are the configurations that I changed on the
> > > > flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
> > > > filesystem. I am not sure if I need to use HDFS here since I am
> > > > testing only in one machine.
> > > >
> > > > jobmanager.heap.size: 12g
> > > > taskmanager.memory.process.size: 8g
> > > > state.backend: rocksdb
> > > > state.checkpoints.dir: file:///tmp/flink/state
> > > >
> > > > In the stream application I am using RocksDB as well (full code [3]):
> > > > StreamExecutionEnvironment env =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true));
> > > >
> > > > I have some operators that hold a large state when the load a static
> > > > table on their state. I use them in two aggregate operations [1] and
> > > > [2].
> > > >
> > > > [1] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> > > > [2] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> > > > [3] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
> > > >
> > > > Here is my stack trace error:
> > > >
> > > > org.apache.flink.runtime.JobException: Recovery is suppressed by
> > > > NoRestartBackoffTimeStrategy
> > > > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> > > > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> > > > at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> > > > at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> > > > at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> > > > at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> > > > at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> > > > at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> > > > at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> > > > at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> > > > at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> > > > at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> > > > at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> > > > at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> > > > at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> > > > at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> > > > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> > > > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> > > > at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> > > > at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> > > > at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> > > > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> > > > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> > > > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > > > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> > > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > > > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > > at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> > > > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > > > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > > > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > > > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > > > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > Caused by: java.util.concurrent.TimeoutException: Heartbeat of
> > > > TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
> > > > ... 26 more
> > > >
> > > > Thanks,
> > > > Felipe
> > > > --
> > > > -- Felipe Gutierrez
> > > > -- skype: felipe.o.gutierrez
> > > > -- https://felipeogutierrez.blogspot.com

Re: Timeout when using RockDB to handle large state in a stream app

Posted by Yun Tang <my...@live.com>.
Hi Felipe

flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed cannot tell you how much memory is used by RocksDB as it mallocate memory from os directly instead from JVM.

Moreover, I cannot totally understand why you ask how to increase the memory of the JM and TM when using the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB.
Did you mean how to increase the total process memory? If so, as Flink uses managed memory to control RocksDB [1] by default, you could increase total memory by increasing managed memory [2][3]

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-rocksdb-memory-managed
[2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-fraction
[3] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-size

Best,
Yun Tang



________________________________
From: Felipe Gutierrez <fe...@gmail.com>
Sent: Monday, July 6, 2020 19:17
To: Yun Tang <my...@live.com>
Cc: Ori Popowski <or...@gmail.com>; user <us...@flink.apache.org>
Subject: Re: Timeout when using RockDB to handle large state in a stream app

Hi all,

I tested the two TPC-H query 03 [1] and 10 [2] using Datastream API on
the cluster with RocksDB state backend. One thing that I did that
improved a lot was to replace the List<LineItem> POJO to a
List<Tuple2<>>. Then I could load a table of 200MB in memory as my
state. However, the original table is 725MB, and turned out that I
need another configuration. I am not sure what I can do more to reduce
the size of my state. If one of you have an idea I am thankful to
hear.

Now, speaking about the flink-conf.yaml file and the RocksDB
configuration. When I use these configurations on the flink-conf.yaml
the stream job still runs out of memory.
jobmanager.heap.size: 4g # default: 2048m
heartbeat.timeout: 100000
taskmanager.memory.process.size: 2g # default: 1728m

Then I changed for this configuration which I can set
programmatically. The stream job seems to behave better. It starts to
process something, then the metrics disappear for some time and appear
again. The available and used memory on the TM
(flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed) is 167MB. And
the available and used memory on the JM
(flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed) is 610KB. I
guess the PredefinedOptions.SPINNING_DISK_OPTIMIZED configuration is
overwriting the configuration on the flink-conf.yaml file.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(stateDir, true);
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
env.setStateBackend(stateBackend);

How can I increase the memory of the JM and TM when I am still using
the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB?

[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
[2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery10.java

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, Jul 3, 2020 at 9:01 AM Felipe Gutierrez
<fe...@gmail.com> wrote:
>
> yes. I agree. because RocsDB will spill data to disk if there is not
> enough space in memory.
> Thanks
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, Jul 3, 2020 at 8:27 AM Yun Tang <my...@live.com> wrote:
> >
> > Hi Felipe,
> >
> > I noticed my previous mail has a typo: RocksDB is executed in task main thread which does not take the role to respond to heart beat. Sorry for previous typo, and the key point I want to clarify is that RocksDB should not have business for heartbeat problem.
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Felipe Gutierrez <fe...@gmail.com>
> > Sent: Tuesday, June 30, 2020 17:46
> > To: Yun Tang <my...@live.com>
> > Cc: Ori Popowski <or...@gmail.com>; user <us...@flink.apache.org>
> > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> >
> > Hi,
> >
> > I reduced the size of the tables that I am loading on a ListState and
> > the query worked. One of them was about 700MB [1] [2].
> >
> > Now I am gonna deploy it on the cluster and check if it works. I will
> > probably need to increase the heartbeat timeout.
> >
> > Thanks,
> > Felipe
> > [1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
> > [2] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Jun 30, 2020 at 10:51 AM Yun Tang <my...@live.com> wrote:
> > >
> > > Hi Felipe
> > >
> > > RocksDB is executed in task main thread which does take the role to respond to heart beat and RocksDB mainly use native memory which is decoupled from JVM heap to not bring any GC pressure. Thus, timeout should have no relationship with RocksDB in general if your task manager is really heartbeat timeout instead of crash to exit.
> > >
> > > Try to increase the heartbeat timeout [1] and watch the GC detail logs to see anything weird.
> > >
> > > [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout
> > >
> > > Best
> > > Yun Tang
> > >
> > > ________________________________
> > > From: Ori Popowski <or...@gmail.com>
> > > Sent: Monday, June 29, 2020 17:44
> > > Cc: user <us...@flink.apache.org>
> > > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> > >
> > > Hi there,
> > >
> > > I'm currently experiencing the exact same issue.
> > >
> > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
> > >
> > > I've found out that GC is causing the problem, but I still haven't managed to solve this.
> > >
> > >
> > >
> > > On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <fe...@gmail.com> wrote:
> > >
> > > Hi community,
> > >
> > > I am trying to run a stream application with large state in a
> > > standalone flink cluster [3]. I configured the RocksDB state backend
> > > and I increased the memory of the Job Manager and Task Manager.
> > > However, I am still getting the timeout message
> > > "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
> > > id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> > > 1.10.1 and here are the configurations that I changed on the
> > > flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
> > > filesystem. I am not sure if I need to use HDFS here since I am
> > > testing only in one machine.
> > >
> > > jobmanager.heap.size: 12g
> > > taskmanager.memory.process.size: 8g
> > > state.backend: rocksdb
> > > state.checkpoints.dir: file:///tmp/flink/state
> > >
> > > In the stream application I am using RocksDB as well (full code [3]):
> > > StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true));
> > >
> > > I have some operators that hold a large state when the load a static
> > > table on their state. I use them in two aggregate operations [1] and
> > > [2].
> > >
> > > [1] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> > > [2] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> > > [3] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
> > >
> > > Here is my stack trace error:
> > >
> > > org.apache.flink.runtime.JobException: Recovery is suppressed by
> > > NoRestartBackoffTimeStrategy
> > > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> > > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> > > at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> > > at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> > > at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> > > at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> > > at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> > > at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> > > at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> > > at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> > > at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> > > at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> > > at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> > > at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> > > at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> > > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> > > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> > > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> > > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > Caused by: java.util.concurrent.TimeoutException: Heartbeat of
> > > TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
> > > ... 26 more
> > >
> > > Thanks,
> > > Felipe
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com

Re: Timeout when using RockDB to handle large state in a stream app

Posted by Felipe Gutierrez <fe...@gmail.com>.
Hi all,

I tested the two TPC-H query 03 [1] and 10 [2] using Datastream API on
the cluster with RocksDB state backend. One thing that I did that
improved a lot was to replace the List<LineItem> POJO to a
List<Tuple2<>>. Then I could load a table of 200MB in memory as my
state. However, the original table is 725MB, and turned out that I
need another configuration. I am not sure what I can do more to reduce
the size of my state. If one of you have an idea I am thankful to
hear.

Now, speaking about the flink-conf.yaml file and the RocksDB
configuration. When I use these configurations on the flink-conf.yaml
the stream job still runs out of memory.
jobmanager.heap.size: 4g # default: 2048m
heartbeat.timeout: 100000
taskmanager.memory.process.size: 2g # default: 1728m

Then I changed for this configuration which I can set
programmatically. The stream job seems to behave better. It starts to
process something, then the metrics disappear for some time and appear
again. The available and used memory on the TM
(flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed) is 167MB. And
the available and used memory on the JM
(flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed) is 610KB. I
guess the PredefinedOptions.SPINNING_DISK_OPTIMIZED configuration is
overwriting the configuration on the flink-conf.yaml file.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(stateDir, true);
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
env.setStateBackend(stateBackend);

How can I increase the memory of the JM and TM when I am still using
the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB?

[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
[2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery10.java

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, Jul 3, 2020 at 9:01 AM Felipe Gutierrez
<fe...@gmail.com> wrote:
>
> yes. I agree. because RocsDB will spill data to disk if there is not
> enough space in memory.
> Thanks
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, Jul 3, 2020 at 8:27 AM Yun Tang <my...@live.com> wrote:
> >
> > Hi Felipe,
> >
> > I noticed my previous mail has a typo: RocksDB is executed in task main thread which does not take the role to respond to heart beat. Sorry for previous typo, and the key point I want to clarify is that RocksDB should not have business for heartbeat problem.
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Felipe Gutierrez <fe...@gmail.com>
> > Sent: Tuesday, June 30, 2020 17:46
> > To: Yun Tang <my...@live.com>
> > Cc: Ori Popowski <or...@gmail.com>; user <us...@flink.apache.org>
> > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> >
> > Hi,
> >
> > I reduced the size of the tables that I am loading on a ListState and
> > the query worked. One of them was about 700MB [1] [2].
> >
> > Now I am gonna deploy it on the cluster and check if it works. I will
> > probably need to increase the heartbeat timeout.
> >
> > Thanks,
> > Felipe
> > [1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
> > [2] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Jun 30, 2020 at 10:51 AM Yun Tang <my...@live.com> wrote:
> > >
> > > Hi Felipe
> > >
> > > RocksDB is executed in task main thread which does take the role to respond to heart beat and RocksDB mainly use native memory which is decoupled from JVM heap to not bring any GC pressure. Thus, timeout should have no relationship with RocksDB in general if your task manager is really heartbeat timeout instead of crash to exit.
> > >
> > > Try to increase the heartbeat timeout [1] and watch the GC detail logs to see anything weird.
> > >
> > > [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout
> > >
> > > Best
> > > Yun Tang
> > >
> > > ________________________________
> > > From: Ori Popowski <or...@gmail.com>
> > > Sent: Monday, June 29, 2020 17:44
> > > Cc: user <us...@flink.apache.org>
> > > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> > >
> > > Hi there,
> > >
> > > I'm currently experiencing the exact same issue.
> > >
> > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
> > >
> > > I've found out that GC is causing the problem, but I still haven't managed to solve this.
> > >
> > >
> > >
> > > On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <fe...@gmail.com> wrote:
> > >
> > > Hi community,
> > >
> > > I am trying to run a stream application with large state in a
> > > standalone flink cluster [3]. I configured the RocksDB state backend
> > > and I increased the memory of the Job Manager and Task Manager.
> > > However, I am still getting the timeout message
> > > "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
> > > id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> > > 1.10.1 and here are the configurations that I changed on the
> > > flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
> > > filesystem. I am not sure if I need to use HDFS here since I am
> > > testing only in one machine.
> > >
> > > jobmanager.heap.size: 12g
> > > taskmanager.memory.process.size: 8g
> > > state.backend: rocksdb
> > > state.checkpoints.dir: file:///tmp/flink/state
> > >
> > > In the stream application I am using RocksDB as well (full code [3]):
> > > StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true));
> > >
> > > I have some operators that hold a large state when the load a static
> > > table on their state. I use them in two aggregate operations [1] and
> > > [2].
> > >
> > > [1] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> > > [2] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> > > [3] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
> > >
> > > Here is my stack trace error:
> > >
> > > org.apache.flink.runtime.JobException: Recovery is suppressed by
> > > NoRestartBackoffTimeStrategy
> > > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> > > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> > > at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> > > at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> > > at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> > > at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> > > at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> > > at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> > > at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> > > at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> > > at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> > > at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> > > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> > > at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> > > at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> > > at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> > > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> > > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> > > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> > > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > Caused by: java.util.concurrent.TimeoutException: Heartbeat of
> > > TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
> > > ... 26 more
> > >
> > > Thanks,
> > > Felipe
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com

Re: Timeout when using RockDB to handle large state in a stream app

Posted by Felipe Gutierrez <fe...@gmail.com>.
yes. I agree. because RocsDB will spill data to disk if there is not
enough space in memory.
Thanks
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, Jul 3, 2020 at 8:27 AM Yun Tang <my...@live.com> wrote:
>
> Hi Felipe,
>
> I noticed my previous mail has a typo: RocksDB is executed in task main thread which does not take the role to respond to heart beat. Sorry for previous typo, and the key point I want to clarify is that RocksDB should not have business for heartbeat problem.
>
> Best
> Yun Tang
> ________________________________
> From: Felipe Gutierrez <fe...@gmail.com>
> Sent: Tuesday, June 30, 2020 17:46
> To: Yun Tang <my...@live.com>
> Cc: Ori Popowski <or...@gmail.com>; user <us...@flink.apache.org>
> Subject: Re: Timeout when using RockDB to handle large state in a stream app
>
> Hi,
>
> I reduced the size of the tables that I am loading on a ListState and
> the query worked. One of them was about 700MB [1] [2].
>
> Now I am gonna deploy it on the cluster and check if it works. I will
> probably need to increase the heartbeat timeout.
>
> Thanks,
> Felipe
> [1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
> [2] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Tue, Jun 30, 2020 at 10:51 AM Yun Tang <my...@live.com> wrote:
> >
> > Hi Felipe
> >
> > RocksDB is executed in task main thread which does take the role to respond to heart beat and RocksDB mainly use native memory which is decoupled from JVM heap to not bring any GC pressure. Thus, timeout should have no relationship with RocksDB in general if your task manager is really heartbeat timeout instead of crash to exit.
> >
> > Try to increase the heartbeat timeout [1] and watch the GC detail logs to see anything weird.
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Ori Popowski <or...@gmail.com>
> > Sent: Monday, June 29, 2020 17:44
> > Cc: user <us...@flink.apache.org>
> > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> >
> > Hi there,
> >
> > I'm currently experiencing the exact same issue.
> >
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
> >
> > I've found out that GC is causing the problem, but I still haven't managed to solve this.
> >
> >
> >
> > On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <fe...@gmail.com> wrote:
> >
> > Hi community,
> >
> > I am trying to run a stream application with large state in a
> > standalone flink cluster [3]. I configured the RocksDB state backend
> > and I increased the memory of the Job Manager and Task Manager.
> > However, I am still getting the timeout message
> > "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
> > id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> > 1.10.1 and here are the configurations that I changed on the
> > flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
> > filesystem. I am not sure if I need to use HDFS here since I am
> > testing only in one machine.
> >
> > jobmanager.heap.size: 12g
> > taskmanager.memory.process.size: 8g
> > state.backend: rocksdb
> > state.checkpoints.dir: file:///tmp/flink/state
> >
> > In the stream application I am using RocksDB as well (full code [3]):
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true));
> >
> > I have some operators that hold a large state when the load a static
> > table on their state. I use them in two aggregate operations [1] and
> > [2].
> >
> > [1] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> > [2] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> > [3] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
> >
> > Here is my stack trace error:
> >
> > org.apache.flink.runtime.JobException: Recovery is suppressed by
> > NoRestartBackoffTimeStrategy
> > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> > at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> > at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> > at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> > at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> > at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> > at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> > at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> > at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> > at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> > at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> > at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> > at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> > at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> > at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> > at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> > at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> > at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> > at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: java.util.concurrent.TimeoutException: Heartbeat of
> > TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
> > ... 26 more
> >
> > Thanks,
> > Felipe
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com

Re: Timeout when using RockDB to handle large state in a stream app

Posted by Yun Tang <my...@live.com>.
Hi Felipe,

I noticed my previous mail has a typo: RocksDB is executed in task main thread which does not take the role to respond to heart beat. Sorry for previous typo, and the key point I want to clarify is that RocksDB should not have business for heartbeat problem.

Best
Yun Tang
________________________________
From: Felipe Gutierrez <fe...@gmail.com>
Sent: Tuesday, June 30, 2020 17:46
To: Yun Tang <my...@live.com>
Cc: Ori Popowski <or...@gmail.com>; user <us...@flink.apache.org>
Subject: Re: Timeout when using RockDB to handle large state in a stream app

Hi,

I reduced the size of the tables that I am loading on a ListState and
the query worked. One of them was about 700MB [1] [2].

Now I am gonna deploy it on the cluster and check if it works. I will
probably need to increase the heartbeat timeout.

Thanks,
Felipe
[1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
[2] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Jun 30, 2020 at 10:51 AM Yun Tang <my...@live.com> wrote:
>
> Hi Felipe
>
> RocksDB is executed in task main thread which does take the role to respond to heart beat and RocksDB mainly use native memory which is decoupled from JVM heap to not bring any GC pressure. Thus, timeout should have no relationship with RocksDB in general if your task manager is really heartbeat timeout instead of crash to exit.
>
> Try to increase the heartbeat timeout [1] and watch the GC detail logs to see anything weird.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout
>
> Best
> Yun Tang
>
> ________________________________
> From: Ori Popowski <or...@gmail.com>
> Sent: Monday, June 29, 2020 17:44
> Cc: user <us...@flink.apache.org>
> Subject: Re: Timeout when using RockDB to handle large state in a stream app
>
> Hi there,
>
> I'm currently experiencing the exact same issue.
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
>
> I've found out that GC is causing the problem, but I still haven't managed to solve this.
>
>
>
> On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <fe...@gmail.com> wrote:
>
> Hi community,
>
> I am trying to run a stream application with large state in a
> standalone flink cluster [3]. I configured the RocksDB state backend
> and I increased the memory of the Job Manager and Task Manager.
> However, I am still getting the timeout message
> "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
> id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> 1.10.1 and here are the configurations that I changed on the
> flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
> filesystem. I am not sure if I need to use HDFS here since I am
> testing only in one machine.
>
> jobmanager.heap.size: 12g
> taskmanager.memory.process.size: 8g
> state.backend: rocksdb
> state.checkpoints.dir: file:///tmp/flink/state
>
> In the stream application I am using RocksDB as well (full code [3]):
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true));
>
> I have some operators that hold a large state when the load a static
> table on their state. I use them in two aggregate operations [1] and
> [2].
>
> [1] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> [2] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> [3] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
>
> Here is my stack trace error:
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.TimeoutException: Heartbeat of
> TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
> ... 26 more
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com

Re: Timeout when using RockDB to handle large state in a stream app

Posted by Felipe Gutierrez <fe...@gmail.com>.
Hi,

I reduced the size of the tables that I am loading on a ListState and
the query worked. One of them was about 700MB [1] [2].

Now I am gonna deploy it on the cluster and check if it works. I will
probably need to increase the heartbeat timeout.

Thanks,
Felipe
[1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
[2] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Jun 30, 2020 at 10:51 AM Yun Tang <my...@live.com> wrote:
>
> Hi Felipe
>
> RocksDB is executed in task main thread which does take the role to respond to heart beat and RocksDB mainly use native memory which is decoupled from JVM heap to not bring any GC pressure. Thus, timeout should have no relationship with RocksDB in general if your task manager is really heartbeat timeout instead of crash to exit.
>
> Try to increase the heartbeat timeout [1] and watch the GC detail logs to see anything weird.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout
>
> Best
> Yun Tang
>
> ________________________________
> From: Ori Popowski <or...@gmail.com>
> Sent: Monday, June 29, 2020 17:44
> Cc: user <us...@flink.apache.org>
> Subject: Re: Timeout when using RockDB to handle large state in a stream app
>
> Hi there,
>
> I'm currently experiencing the exact same issue.
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
>
> I've found out that GC is causing the problem, but I still haven't managed to solve this.
>
>
>
> On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <fe...@gmail.com> wrote:
>
> Hi community,
>
> I am trying to run a stream application with large state in a
> standalone flink cluster [3]. I configured the RocksDB state backend
> and I increased the memory of the Job Manager and Task Manager.
> However, I am still getting the timeout message
> "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
> id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> 1.10.1 and here are the configurations that I changed on the
> flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
> filesystem. I am not sure if I need to use HDFS here since I am
> testing only in one machine.
>
> jobmanager.heap.size: 12g
> taskmanager.memory.process.size: 8g
> state.backend: rocksdb
> state.checkpoints.dir: file:///tmp/flink/state
>
> In the stream application I am using RocksDB as well (full code [3]):
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true));
>
> I have some operators that hold a large state when the load a static
> table on their state. I use them in two aggregate operations [1] and
> [2].
>
> [1] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> [2] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> [3] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
>
> Here is my stack trace error:
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.TimeoutException: Heartbeat of
> TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
> ... 26 more
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com

Re: Timeout when using RockDB to handle large state in a stream app

Posted by Yun Tang <my...@live.com>.
Hi Felipe

RocksDB is executed in task main thread which does take the role to respond to heart beat and RocksDB mainly use native memory which is decoupled from JVM heap to not bring any GC pressure. Thus, timeout should have no relationship with RocksDB in general if your task manager is really heartbeat timeout instead of crash to exit.

Try to increase the heartbeat timeout [1] and watch the GC detail logs to see anything weird.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout

Best
Yun Tang

________________________________
From: Ori Popowski <or...@gmail.com>
Sent: Monday, June 29, 2020 17:44
Cc: user <us...@flink.apache.org>
Subject: Re: Timeout when using RockDB to handle large state in a stream app

Hi there,

I'm currently experiencing the exact same issue.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html

I've found out that GC is causing the problem, but I still haven't managed to solve this.



On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <fe...@gmail.com>> wrote:
Hi community,

I am trying to run a stream application with large state in a
standalone flink cluster [3]. I configured the RocksDB state backend
and I increased the memory of the Job Manager and Task Manager.
However, I am still getting the timeout message
"java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
1.10.1 and here are the configurations that I changed on the
flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
filesystem. I am not sure if I need to use HDFS here since I am
testing only in one machine.

jobmanager.heap.size: 12g
taskmanager.memory.process.size: 8g
state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink/state

In the stream application I am using RocksDB as well (full code [3]):
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true));

I have some operators that hold a large state when the load a static
table on their state. I use them in two aggregate operations [1] and
[2].

[1] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
[2] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
[3] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java

Here is my stack trace error:

org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of
TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
... 26 more

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

Re: Timeout when using RockDB to handle large state in a stream app

Posted by Ori Popowski <or...@gmail.com>.
Hi there,

I'm currently experiencing the exact same issue.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html

I've found out that GC is causing the problem, but I still haven't managed
to solve this.



On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> Hi community,
>
> I am trying to run a stream application with large state in a
> standalone flink cluster [3]. I configured the RocksDB state backend
> and I increased the memory of the Job Manager and Task Manager.
> However, I am still getting the timeout message
> "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
> id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> 1.10.1 and here are the configurations that I changed on the
> flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
> filesystem. I am not sure if I need to use HDFS here since I am
> testing only in one machine.
>
> jobmanager.heap.size: 12g
> taskmanager.memory.process.size: 8g
> state.backend: rocksdb
> state.checkpoints.dir: file:///tmp/flink/state
>
> In the stream application I am using RocksDB as well (full code [3]):
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state",
> true));
>
> I have some operators that hold a large state when the load a static
> table on their state. I use them in two aggregate operations [1] and
> [2].
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
>
> Here is my stack trace error:
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> at
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> at
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> at
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.TimeoutException: Heartbeat of
> TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.
> ... 26 more
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>