You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jeff Henrikson <je...@gmail.com> on 2020/06/17 17:46:54 UTC

Trouble with large state

Hello Flink users,

I have an application of around 10 enrichment joins.  All events are 
read from kafka and have event timestamps.  The joins are built using 
.cogroup, with a global window, triggering on every 1 event, plus a 
custom evictor that drops records once a newer record for the same ID 
has been processed.  Deletes are represented by empty events with 
timestamp and ID (tombstones). That way, we can drop records when 
business logic dictates, as opposed to when a maximum retention has been 
attained.  The application runs RocksDBStateBackend, on Kubernetes on 
AWS with local SSDs.

Unit tests show that the joins produce expected results.  On an 8 node 
cluster, watermark output progress seems to indicate I should be able to 
bootstrap my state of around 500GB in around 1 day.  I am able to save 
and restore savepoints for the first half an hour of run time.

My current trouble is that after around 50GB of state, I stop being able 
to reliably take checkpoints or savepoints.  Some time after that, I 
start getting a variety of failures where the first suspicious log event 
is a generic cluster connectivity error, such as:

     1) java.io.IOException: Connecting the channel failed: Connecting
     to remote task manager + '/10.67.7.101:38955' has failed. This
     might indicate that the remote task manager has been lost.

     2) org.apache.flink.runtime.io.network.netty.exception
     .RemoteTransportException: Connection unexpectedly closed by remote
     task manager 'null'. This might indicate that the remote task
     manager was lost.

     3) Association with remote system
     [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
     gated for [50] ms. Reason: [Association failed with
     [akka.tcp://flink@10.67.6.66:34987]] Caused by:
     [java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum 
savable state size.

I could enable HA, but for the time being I have been leaving it out to 
avoid the possibility of masking deterministic faults.

Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
     parallelism=8
     maxParallelism=64
     setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
     setTolerableCheckpointFailureNumber(1000)
     setMaxConcurrentCheckpoints(1)
 
enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
     RocksDBStateBackend
     setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
     setNumberOfTransferThreads(25)
     setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

     jobmanager.rpc.address: localhost
     jobmanager.rpc.port: 6123
     jobmanager.heap.size: 28000m
     taskmanager.memory.process.size: 28000m
     taskmanager.memory.jvm-metaspace.size: 512m
     taskmanager.numberOfTaskSlots: 1
     parallelism.default: 1
     jobmanager.execution.failover-strategy: full

     cluster.evenly-spread-out-slots: false

     taskmanager.memory.network.fraction: 0.2           # default 0.1
     taskmanager.memory.framework.off-heap.size: 2GB
     taskmanager.memory.task.off-heap.size: 2GB
     taskmanager.network.memory.buffers-per-channel: 32 # default 2
     taskmanager.memory.managed.fraction: 0.4           # docs say 
default 0.1, but something seems to set 0.4
     taskmanager.memory.task.off-heap.size: 2048MB      # default 128M

     state.backend.fs.memory-threshold: 1048576
     state.backend.fs.write-buffer-size: 10240000
     state.backend.local-recovery: true
     state.backend.rocksdb.writebuffer.size: 64MB
     state.backend.rocksdb.writebuffer.count: 8
     state.backend.rocksdb.writebuffer.number-to-merge: 4
     state.backend.rocksdb.timer-service.factory: heap
     state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
     state.backend.rocksdb.write-batch-size: 16000000 # default 2MB

     web.checkpoints.history: 250

Re: Trouble with large state

Posted by Jeff Henrikson <je...@gmail.com>.
Hi Yun,

Thanks for your thoughts.  Answers to your questions:

 >  1. "after around 50GB of state, I stop being able to reliably take
 >     checkpoints or savepoints. "
 >     What is the exact reason that job cannot complete checkpoint?
 >     Expired before completing or decline by some tasks? The former one
 >     is manly caused by high back-pressure and the later one is mainly
 >     due to some internal error.

In the UI, under Job | Checkpoints | History, then opening the 
checkpoint detail, the checkpoints fail by not some operators not 
acknowledging.  It's always a subset of of the larger state operators 
that stop acknowledging.  The exact selection of operators that stop is 
nondeterministic.  The checkpoints frequently fail before any timeout 
that I impose on them.

 >  2. Have you checked what reason the remote task manager is lost?
 >     If the remote task manager is not crashed, it might be due to GC
 >     impact, I think you might need to check task-manager logs and GC 
logs.

The only general pattern I have observed is:

     1) Some taskmanager A throws one of the various connectivity
     exceptions I listed complaining about another taskmanager B.
     2) Taskmanager B shows no obvious error other than complaining
     that taskmanager A has disconnected from it.

Regards,


Jeff Henrikson



On 6/17/20 9:52 PM, Yun Tang wrote:
> Hi Jeff
> 
>  1. "after around 50GB of state, I stop being able to reliably take
>     checkpoints or savepoints. "
>     What is the exact reason that job cannot complete checkpoint?
>     Expired before completing or decline by some tasks? The former one
>     is manly caused by high back-pressure and the later one is mainly
>     due to some internal error.
>  2. Have you checked what reason the remote task manager is lost?
>     If the remote task manager is not crashed, it might be due to GC
>     impact, I think you might need to check task-manager logs and GC logs.
> 
> Best
> Yun Tang
> ------------------------------------------------------------------------
> *From:* Jeff Henrikson <je...@gmail.com>
> *Sent:* Thursday, June 18, 2020 1:46
> *To:* user <us...@flink.apache.org>
> *Subject:* Trouble with large state
> Hello Flink users,
> 
> I have an application of around 10 enrichment joins.  All events are
> read from kafka and have event timestamps.  The joins are built using
> .cogroup, with a global window, triggering on every 1 event, plus a
> custom evictor that drops records once a newer record for the same ID
> has been processed.  Deletes are represented by empty events with
> timestamp and ID (tombstones). That way, we can drop records when
> business logic dictates, as opposed to when a maximum retention has been
> attained.  The application runs RocksDBStateBackend, on Kubernetes on
> AWS with local SSDs.
> 
> Unit tests show that the joins produce expected results.  On an 8 node
> cluster, watermark output progress seems to indicate I should be able to
> bootstrap my state of around 500GB in around 1 day.  I am able to save
> and restore savepoints for the first half an hour of run time.
> 
> My current trouble is that after around 50GB of state, I stop being able
> to reliably take checkpoints or savepoints.  Some time after that, I
> start getting a variety of failures where the first suspicious log event
> is a generic cluster connectivity error, such as:
> 
>       1) java.io.IOException: Connecting the channel failed: Connecting
>       to remote task manager + '/10.67.7.101:38955' has failed. This
>       might indicate that the remote task manager has been lost.
> 
>       2) org.apache.flink.runtime.io.network.netty.exception
>       .RemoteTransportException: Connection unexpectedly closed by remote
>       task manager 'null'. This might indicate that the remote task
>       manager was lost.
> 
>       3) Association with remote system
>       [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
>       gated for [50] ms. Reason: [Association failed with
>       [akka.tcp://flink@10.67.6.66:34987]] Caused by:
>       [java.net.NoRouteToHostException: No route to host]
> 
> I don't see any obvious out of memory errors on the TaskManager UI.
> 
> Adding nodes to the cluster does not seem to increase the maximum
> savable state size.
> 
> I could enable HA, but for the time being I have been leaving it out to
> avoid the possibility of masking deterministic faults.
> 
> Below are my configurations.
> 
> Thanks in advance for any advice.
> 
> Regards,
> 
> 
> Jeff Henrikson
> 
> 
> 
> Flink version: 1.10
> 
> Configuration set via code:
>       parallelism=8
>       maxParallelism=64
>       setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>       setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>       setTolerableCheckpointFailureNumber(1000)
>       setMaxConcurrentCheckpoints(1)
> 
> enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>       RocksDBStateBackend
>       setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>       setNumberOfTransferThreads(25)
>       setDbStoragePath points to a local nvme SSD
> 
> Configuration in flink-conf.yaml:
> 
>       jobmanager.rpc.address: localhost
>       jobmanager.rpc.port: 6123
>       jobmanager.heap.size: 28000m
>       taskmanager.memory.process.size: 28000m
>       taskmanager.memory.jvm-metaspace.size: 512m
>       taskmanager.numberOfTaskSlots: 1
>       parallelism.default: 1
>       jobmanager.execution.failover-strategy: full
> 
>       cluster.evenly-spread-out-slots: false
> 
>       taskmanager.memory.network.fraction: 0.2           # default 0.1
>       taskmanager.memory.framework.off-heap.size: 2GB
>       taskmanager.memory.task.off-heap.size: 2GB
>       taskmanager.network.memory.buffers-per-channel: 32 # default 2
>       taskmanager.memory.managed.fraction: 0.4           # docs say
> default 0.1, but something seems to set 0.4
>       taskmanager.memory.task.off-heap.size: 2048MB      # default 128M
> 
>       state.backend.fs.memory-threshold: 1048576
>       state.backend.fs.write-buffer-size: 10240000
>       state.backend.local-recovery: true
>       state.backend.rocksdb.writebuffer.size: 64MB
>       state.backend.rocksdb.writebuffer.count: 8
>       state.backend.rocksdb.writebuffer.number-to-merge: 4
>       state.backend.rocksdb.timer-service.factory: heap
>       state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
>       state.backend.rocksdb.write-batch-size: 16000000 # default 2MB
> 
>       web.checkpoints.history: 250

Re: Trouble with large state

Posted by Vijay Bhaskar <bh...@gmail.com>.
Jeff
Glad to know that you are able to progress well and issue got resolved

Regards
Bhaskar

On Tue, Jun 23, 2020 at 12:24 AM Jeff Henrikson <je...@gmail.com>
wrote:

> Bhaskar,
>
> I think I am unstuck.  The performance numbers I sent after throttling
> were due to a one character error in business logic.  I think I now have
> something good enough to work with for now.  I will repost if I
> encounter further unexpected issues.
>
> Adding application-level throttling ends up resolving both my symptom of
> slow/failing checkpoints, and also my symptom of crashes during long runs.
>
> Many thanks!
>
>
> Jeff
>
>
> On 6/20/20 11:46 AM, Jeff Henrikson wrote:
> > Bhaskar,
> >
> >  > Glad to know some progress.
> >
> > Yeah, some progress.  Yet overnight run didn't look as good as I hoped.
> >
> > The throttling required to not crash during snapshots seems to be quite
> > different from the throttling required to crash not during snapshots. So
> > the lowest common denominator is quite a large performance penalty.
> >
> > What's worse, the rate of input that makes the snapshot performance go
> > from good to bad seems to change significantly as the state size grows.
> > Here is checkpoint history from an overnight run.
> >
> > Parameters:
> >
> >      - 30 minutes minimum between snapshots
> >      - incremental snapshot mode
> >      - inputs throttled to 100 events per sec per input per slot,
> >        which is around 1/4 of the unthrottled throughput
> >
> > Checkpoint history:
> >
> >      ID    Status    Acknowledged    Trigger Time    Latest
> > Acknowledgement    End to End Duration    State Size    Buffered During
> > Alignment
> >      12    COMPLETED    304/304    8:52:22    10:37:18    1h 44m 55s
> > 60.5 GB    0 B
> >      11    COMPLETED    304/304    6:47:03    8:22:19    1h 35m 16s
> > 53.3 GB    0 B
> >      10    COMPLETED    304/304    5:01:20    6:17:00    1h 15m 39s
> > 41.0 GB    0 B
> >      9    COMPLETED    304/304    3:47:43    4:31:19    43m 35s    34.1
> > GB    0 B
> >      8    COMPLETED    304/304    2:40:58    3:17:42    36m 43s    27.8
> > GB    0 B
> >      7    COMPLETED    304/304    1:39:15    2:10:57    31m 42s    23.1
> > GB    0 B
> >      6    COMPLETED    304/304    0:58:02    1:09:13    11m 11s    17.4
> > GB    0 B
> >      5    COMPLETED    304/304    0:23:27    0:28:01    4m 33s    14.3
> > GB    0 B
> >      4    COMPLETED    304/304    23:52:29    23:53:26    56s    12.7
> > GB    0 B
> >      3    COMPLETED    304/304    23:20:59    23:22:28    1m 29s    10.8
> > GB    0 B
> >      2    COMPLETED    304/304    22:46:17    22:50:58    4m 40s    7.40
> > GB    0 B
> >
> > As you can see, GB/minute varies drastically.  GB/minute also varies
> > drastically with full checkpoint mode.
> >
> > I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the
> > checkpoint GB/minute getting so slow, it will crash soon.
> >
> > I'm really wishing state.backend.async=false worked for
> > RocksDbStateBackend.
> >
> > I'm also wondering if my throttler would improve if I just connected to
> > the REST api to ask if any checkpoint is in progress, and then paused
> > inputs accordingly.  Effectively state.backend.async=false via hacked
> > application code.
> >
> >  > Where are you updating your state here? I
> >  > couldn't find any flink managed state here.
> >
> > The only updates to state I make are through the built-in
> > DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I
> > use .cogroup shows exactly two ways that .cogroup calls an
> > implementation of AppendingState.add.  I summarize those below.
> >
> > The two AppendingState subclasses invoked are HeapListState and
> > HeapReducingState.  Neither have a support attribute on them, such as
> > MapState's @PublicEvolving.
> >
> >  > I suggested updating the flink managed state using onTimer over an
> >  > interval equal to the checkpoint interval.
> >
> > So the onTimer method, with interval set to the checkpoint interval.
> > Interesting.
> >
> > It looks like the closest subclass for my use case use would be either
> > KeyedCoProcessFunction.  Let me see if I understand concretely the idea:
> >
> > 1) between checkpoints, read join input and write join output, by
> > loading any state reads from external state, but buffering all state
> > changes in memory in some kind of data structure.
> >
> > 2) whenever a checkpoint arrived or the memory consumed by buffered
> > writes gets too big, flush the writes to state.
> >
> > Is that the gist of the idea about .onTimer?
> >
> >
> > Jeff
> >
> >
> >
> > There are two paths from .coGroup to AppendingState.add
> >
> >      path 1 of 2: .coGroup to HeapListState
> >
> >          add:90, HeapListState {org.apache.flink.runtime.state.heap}
> >          processElement:203, EvictingWindowOperator
> > {org.apache.flink.streaming.runtime.operators.windowing}
> >          processElement:164, StreamOneInputProcessor
> > {org.apache.flink.streaming.runtime.io}
> >          processInput:143, StreamOneInputProcessor
> > {org.apache.flink.streaming.runtime.io}
> >
> >
> >
> org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement
>
> >
> >
> >                (windowAssigner is an instance of GlobalWindows)
> >
> >                  @Override
> >                  public void processElement(StreamRecord<IN> element)
> > throws Exception {
> >                      final Collection<W> elementWindows =
> > windowAssigner.assignWindows(
> >                              element.getValue(), element.getTimestamp(),
> > windowAssignerContext);
> >
> >                      //if element is handled by none of assigned
> > elementWindows
> >                      boolean isSkippedElement = true;
> >
> >                      final K key =
> > this.<K>getKeyedStateBackend().getCurrentKey();
> >
> >                      if (windowAssigner instanceof
> MergingWindowAssigner) {
> >                  . . .
> >                      } else {
> >                          for (W window : elementWindows) {
> >
> >                              // check if the window is already inactive
> >                              if (isWindowLate(window)) {
> >                                  continue;
> >                              }
> >                              isSkippedElement = false;
> >
> >
> > evictingWindowState.setCurrentNamespace(window);
> >                              evictingWindowState.add(element);
> >
> >          =>
> >
> >              org.apache.flink.runtime.state.heap.HeapListState#add:
> >                      @Override
> >                      public void add(V value) {
> >                          Preconditions.checkNotNull(value, "You cannot
> > add null to a ListState.");
> >
> >                          final N namespace = currentNamespace;
> >
> >                          final StateTable<K, N, List<V>> map =
> stateTable;
> >                          List<V> list = map.get(namespace);
> >
> >                          if (list == null) {
> >                              list = new ArrayList<>();
> >                              map.put(namespace, list);
> >                          }
> >                          list.add(value);
> >                      }
> >
> >      path 2 of 2: .coGroup to HeapReducingState
> >
> >              add:95, HeapReducingState
> > {org.apache.flink.runtime.state.heap}
> >              onElement:49, CountTrigger
> > {org.apache.flink.streaming.api.windowing.triggers}
> >              onElement:898, WindowOperator$Context
> > {org.apache.flink.streaming.runtime.operators.windowing}
> >              processElement:210, EvictingWindowOperator
> > {org.apache.flink.streaming.runtime.operators.windowing}
> >              processElement:164, StreamOneInputProcessor
> > {org.apache.flink.streaming.runtime.io}
> >              processInput:143, StreamOneInputProcessor
> > {org.apache.flink.streaming.runtime.io}
> >
> >              @Override
> >              public void processElement(StreamRecord<IN> element) throws
> > Exception {
> >                  final Collection<W> elementWindows =
> > windowAssigner.assignWindows(
> >                          element.getValue(), element.getTimestamp(),
> > windowAssignerContext);
> >
> >                  //if element is handled by none of assigned
> elementWindows
> >                  boolean isSkippedElement = true;
> >
> >                  final K key =
> > this.<K>getKeyedStateBackend().getCurrentKey();
> >
> >                  if (windowAssigner instanceof MergingWindowAssigner) {
> >              . . .
> >                  } else {
> >                      for (W window : elementWindows) {
> >
> >                          // check if the window is already inactive
> >                          if (isWindowLate(window)) {
> >                              continue;
> >                          }
> >                          isSkippedElement = false;
> >
> >                          evictingWindowState.setCurrentNamespace(window);
> >                          evictingWindowState.add(element);
> >
> >                          triggerContext.key = key;
> >                          triggerContext.window = window;
> >                          evictorContext.key = key;
> >                          evictorContext.window = window;
> >
> >                          TriggerResult triggerResult =
> > triggerContext.onElement(element);
> >
> >          =>
> >                  public TriggerResult onElement(StreamRecord<IN>
> > element) throws Exception {
> >                      return trigger.onElement(element.getValue(),
> > element.getTimestamp(), window, this);
> >
> >          =>
> >
> >              @Override
> >              public TriggerResult onElement(Object element, long
> > timestamp, W window, TriggerContext ctx) throws Exception {
> >                  ReducingState<Long> count =
> > ctx.getPartitionedState(stateDesc);
> >                  count.add(1L);
> >
> >          =>
> >
> >              org.apache.flink.runtime.state.heap.HeapReducingState#add
> >                    @Override
> >                    public void add(V value) throws IOException {
> >
> >                        if (value == null) {
> >
> >
> >
> > On 6/19/20 8:22 PM, Vijay Bhaskar wrote:
> >> Glad to know some progress. Where are you updating your state here? I
> >> couldn't find any flink managed state here.
> >> I suggested updating the flink managed state using onTimer over an
> >> interval equal to the checkpoint interval.
> >>
> >> In your case since you do throttling, it helped to maintain the fixed
> >> rate per slot. Before the rate was sporadic.
> >> It's definitely an IO bottleneck.
> >>
> >> So now you can think of decoupling stateless scanning and stateful
> joins.
> >> For example you can keep a stateless scan as separate flink job and
> >> keep its output in some Kafka kind of store.
> >>
> >>  From there you start your stateful joins. This would help focussing
> >> on your stateful job in much better fashion
> >>
> >> Regards
> >> Bhaskar
> >>
> >>
> >>
> >>
> >> On Sat, Jun 20, 2020 at 4:49 AM Jeff Henrikson <jehenrik27@gmail.com
> >> <ma...@gmail.com>> wrote:
> >>
> >>     Bhaskar,
> >>
> >>     Based on your idea of limiting input to get better checkpoint
> >> behavior,
> >>     I made a ProcessFunction that constraints to a number of events per
> >>     second per slot per input.  I do need to do some stateless input
> >>     scanning before joins.  The stateless part needs to be fast and
> >> does no
> >>     impact snapshots.  So I inserted the throttling after the input
> >>     preprocessing but before the stateful transformations.  There is a
> >>     significant difference of snapshot throughput (often 5x or larger)
> >> when
> >>     I change the throttle between 200 and 300 events per second (per
> slot
> >>     per input).
> >>
> >>     Hope the throttling keeps being effective as I keep the job running
> >>     longer.
> >>
> >>     Odd.  But likely a very effective way out of my problem.
> >>
> >>     I wonder what drives it . . .  Thread contention?  IOPS contention?
> >>
> >>     See ProcessFunction code below.
> >>
> >>     Many thanks!
> >>
> >>
> >>     Jeff
> >>
> >>
> >>
> >>     import org.apache.flink.streaming.api.functions.ProcessFunction
> >>     import org.apache.flink.util.Collector
> >>
> >>     // Set eventsPerSecMax to -1 to disable the throttle
> >>     // TODO: Actual number of events can be slightly larger
> >>     // TODO: Remove pause correlation with system clock
> >>
> >>     case class Throttler[T](eventsPerSecMax : Double) extends
> >>     ProcessFunction[T,T] {
> >>         var minutePrev = 0
> >>         var numEvents = 0
> >>         def minutes() = {
> >>           val ms = System.currentTimeMillis()
> >>           (ms / 1000 / 60).toInt
> >>         }
> >>         def increment() = {
> >>           val m = minutes()
> >>           if(m != minutePrev) {
> >>             numEvents = 0
> >>           }
> >>           numEvents += 1
> >>         }
> >>         def eps() = {
> >>           numEvents/60.0
> >>         }
> >>         override def processElement(x: T, ctx: ProcessFunction[T,
> >>     T]#Context,
> >>     out: Collector[T]): Unit = {
> >>           increment()
> >>           if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
> >>             Thread.sleep(1000L)
> >>           }
> >>           out.collect(x)
> >>         }
> >>     }
> >>
> >>     On 6/19/20 9:16 AM, Jeff Henrikson wrote:
> >>      > Bhaskar,
> >>      >
> >>      > Thank you for your thoughtful points.
> >>      >
> >>      >  > I want to discuss more on points (1) and (2)
> >>      >  > If we take care of them  rest will be good
> >>      >  >
> >>      >  > Coming to (1)
> >>      >  >
> >>      >  > Please try to give reasonable checkpoint interval time for
> >>     every job.
> >>      >  > Minum checkpoint interval recommended by flink community is 3
> >>     minutes
> >>      >  > I thin you should give minimum 3 minutes checkpoint interval
> >>     for all
> >>      >
> >>      > I have spent very little time testing with checkpoint intervals
> >>     of under
> >>      > 3 minutes.  I frequently test with intervals of 5 minutes and
> >> of 30
> >>      > minutes.  I also test with checkpoint intervals such as 60
> >>     minutes, and
> >>      > never (manual only).  In terms of which exceptions get thrown, I
> >>     don't
> >>      > see much difference between 5/30/60, I don't see a lot of
> >> difference.
> >>      >
> >>      > Infinity (no checkpoint internal) seems to be an interesting
> >> value,
> >>      > because before crashing, it seems to process around twice as much
> >>     state
> >>      > as with any finite checkpoint interval.  The largest savepoints I
> >>     have
> >>      > captured have been manually triggered using the /job/:jobid/stop
> >>     REST
> >>      > API.  I think it helps for the snapshot to be synchronous.
> >>      >
> >>      > One curiosity about the /job/:jobid/stop command is that from
> >>     time of
> >>      > the command, it often takes many minutes for the internal
> >>     processing to
> >>      > stop.
> >>      >
> >>      > Another curiosity about /job/:jobid/stop command is that
> sometimes
> >>      > following a completed savepoint, the cluster goes back to
> running!
> >>      >
> >>      >  > Coming to (2)
> >>      >  >
> >>      >  > What's your input data rate?
> >>      >
> >>      > My application involves what I will call "main" events that are
> >>     enriched
> >>      > by "secondary" events.  While the secondary events have several
> >>      > different input streams, data types, and join keys, I will
> >>     estimate the
> >>      > secondary events all together.  My estimate for input rate is as
> >>     follows:
> >>      >
> >>      >      50M "main" events
> >>      >      50 secondary events for each main event, for a
> >>      >          total of around 2.5B input events
> >>      >      8 nodes
> >>      >      20 hours
> >>      >
> >>      > Combining these figures, we can estimate:
> >>      >
> >>      >      50000000*50/8/20/3600 = 4340 events/second/node
> >>      >
> >>      > I don't see how to act on your advice for (2).  Maybe your idea
> >>     is that
> >>      > during backfill/bootstrap, I artificially throttle the inputs
> >> to my
> >>      > application?
> >>      >
> >>      > 100% of my application state is due to .cogroup, which manages a
> >>      > HeapListState on its own.  I cannot think of any controls for
> >>     changing
> >>      > how .cogroup handles internal state per se.  I will paste below
> >> the
> >>      > Flink code path that .cogroup uses to update its internal state
> >>     when it
> >>      > runs my application.
> >>      >
> >>      > The only control I can think of with .cogroup that indirectly
> >>     impacts
> >>      > internal state is delayed triggering.
> >>      >
> >>      > Currently I use a trigger on every event, which I understand
> >>     creates a
> >>      > suboptimal number of events.  I previously experimented with
> >> delayed
> >>      > triggering, but I did not get good results.
> >>      >
> >>      > Just now I tried again ContinuousProcessingTimeTrigger of 30
> >>     seconds,
> >>      > with rocksdb.timer-service.factory: heap, and a 5 minute
> >> checkpoint
> >>      > interval.  The first checkpoint failed, which has been rare when
> >>     I use
> >>      > all the same parameters except for triggering on every event.
> >> So it
> >>      > looks worse not better.
> >>      >
> >>      > Thanks again,
> >>      >
> >>      >
> >>      > Jeff Henrikson
> >>      >
> >>      >
> >>      >
> >>      >
> >>      > On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
> >>      >> Thanks for the reply.
> >>      >> I want to discuss more on points (1) and (2)
> >>      >> If we take care of them  rest will be good
> >>      >>
> >>      >> Coming to (1)
> >>      >>
> >>      >> Please try to give reasonable checkpoint interval time for every
> >>     job.
> >>      >> Minum checkpoint interval recommended by flink community is 3
> >>     minutes
> >>      >> I thin you should give minimum 3 minutes checkpoint interval
> >> for all
> >>      >>
> >>      >> Coming to (2)
> >>      >>
> >>      >> What's your input data rate?
> >>      >> For example you are seeing data at 100 msg/sec, For each
> >> message if
> >>      >> there is state changing and you are updating the state with
> >>     RocksDB,
> >>      >> it's going to
> >>      >> create 100 rows in 1 second at RocksDb end, On the average if 50
> >>      >> records have changed each second, even if you are using RocksDB
> >>      >> differentialstate = true,
> >>      >> there is no use. Because everytime 50% is new rows getting
> >>     added. So
> >>      >> the best bet is to update records with RocksDB only once in your
> >>      >> checkpoint interval.
> >>      >> Suppose your checkpoint interval is 5 minutes. If you update
> >>     RocksDB
> >>      >> state once in 5 minutes, then the rate at which new records
> >>     added to
> >>      >> RocksDB  will be 1 record/5min.
> >>      >> Whereas in your original scenario, 30000 records added to
> >>     rocksDB in 5
> >>      >> min. You can save 1:30000 ratio of records in addition to
> >> RocksDB.
> >>      >> Which will save a huge
> >>      >> redundant size addition to RocksDB. Ultimately your  state is
> >>     driven
> >>      >> by your checkpoint interval. From the input source you will go
> >>     back 5
> >>      >> min back and read the state, similarly from RocksDB side
> >>      >> also you can have a state update once in 5 min should work.
> >>     Otherwise
> >>      >> even if you add state there is no use.
> >>      >>
> >>      >> Regards
> >>      >> Bhaskar
> >>      >>
> >>      >> Try to update your RocksDB state in an interval equal to the
> >>      >> checkpoint interval. Otherwise in my case many times what's
> >>     observed is
> >>      >> state size grows unnecessarily.
> >>      >>
> >>      >> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson
> >>     <jehenrik27@gmail.com <ma...@gmail.com>
> >>      >> <mailto:jehenrik27@gmail.com <ma...@gmail.com>>>
> >> wrote:
> >>      >>
> >>      >>     Vijay,
> >>      >>
> >>      >>     Thanks for your thoughts.  Below are answers to your
> >> questions.
> >>      >>
> >>      >>       > 1. What's your checkpoint interval?
> >>      >>
> >>      >>     I have used many different checkpoint intervals, ranging
> >> from 5
> >>      >> minutes
> >>      >>     to never.  I usually setMinPasueBetweenCheckpoints to the
> >> same
> >>      >> value as
> >>      >>     the checkpoint interval.
> >>      >>
> >>      >>       > 2. How frequently are you updating the state into
> >> RocksDB?
> >>      >>
> >>      >>     My understanding is that for .cogroup:
> >>      >>
> >>      >>         - Triggers control communication outside the operator
> >>      >>         - Evictors control cleanup of internal state
> >>      >>         - Configurations like write buffer size control the
> >>     frequency of
> >>      >>     state change at the storage layer
> >>      >>         - There is no control for how frequently the window
> state
> >>      >>     updates at
> >>      >>     the layer of the RocksDB api layer.
> >>      >>
> >>      >>     Thus, the state update whenever data is ingested.
> >>      >>
> >>      >>       > 3. How many task managers are you using?
> >>      >>
> >>      >>     Usually I have been running with one slot per taskmanager.
> >>     28GB of
> >>      >>     usable ram on each node.
> >>      >>
> >>      >>       > 4. How much data each task manager handles while
> >> taking the
> >>      >>     checkpoint?
> >>      >>
> >>      >>     Funny you should ask.  I would be okay with zero.
> >>      >>
> >>      >>     The application I am replacing has a latency of 36-48 hours,
> >>     so if I
> >>      >>     had
> >>      >>     to fully stop processing to take every snapshot
> >>     synchronously, it
> >>      >> might
> >>      >>     be seen as totally acceptable, especially for initial
> >>     bootstrap.
> >>      >> Also,
> >>      >>     the velocity of running this backfill is approximately
> >> 115x real
> >>      >>     time on
> >>      >>     8 nodes, so the steady-state run may not exhibit the failure
> >>     mode in
> >>      >>     question at all.
> >>      >>
> >>      >>     It has come as some frustration to me that, in the case of
> >>      >>     RocksDBStateBackend, the configuration key
> >> state.backend.async
> >>      >>     effectively has no meaningful way to be false.
> >>      >>
> >>      >>     The only way I have found in the existing code to get a
> >>     behavior like
> >>      >>     synchronous snapshot is to POST to /jobs/<jobID>/stop with
> >>      >> drain=false
> >>      >>     and a URL.  This method of failing fast is the way that I
> >>     discovered
> >>      >>     that I needed to increase transfer threads from the default.
> >>      >>
> >>      >>     The reason I don't just run the whole backfill and then
> >> take one
> >>      >>     snapshot is that even in the absence of checkpoints, a very
> >>     similar
> >>      >>     congestion seems to take the cluster down when I am say
> >>     20-30% of the
> >>      >>     way through my backfill.
> >>      >>
> >>      >>     Reloading from my largest feasible snapshot makes it
> >>     possible to make
> >>      >>     another snapshot a bit larger before crash, but not by much.
> >>      >>
> >>      >>     On first glance, the code change to allow
> >>     RocksDBStateBackend into a
> >>      >>     synchronous snapshots mode looks pretty easy.  Nevertheless,
> >>     I was
> >>      >>     hoping to do the initial launch of my application without
> >>     needing to
> >>      >>     modify the framework.
> >>      >>
> >>      >>     Regards,
> >>      >>
> >>      >>
> >>      >>     Jeff Henrikson
> >>      >>
> >>      >>
> >>      >>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
> >>      >>      > For me this seems to be an IO bottleneck at your task
> >>     manager.
> >>      >>      > I have a couple of queries:
> >>      >>      > 1. What's your checkpoint interval?
> >>      >>      > 2. How frequently are you updating the state into
> RocksDB?
> >>      >>      > 3. How many task managers are you using?
> >>      >>      > 4. How much data each task manager handles while taking
> >> the
> >>      >>     checkpoint?
> >>      >>      >
> >>      >>      > For points (3) and (4) , you should be very careful. I
> >>     feel you
> >>      >> are
> >>      >>      > stuck at this.
> >>      >>      > You try to scale vertically by increasing more CPU and
> >>     memory for
> >>      >>     each
> >>      >>      > task manager.
> >>      >>      > If not, try to scale horizontally so that each task
> >>     manager IO
> >>      >>     gets reduces
> >>      >>      > Apart from that check is there any bottleneck with the
> >> file
> >>      >> system.
> >>      >>      >
> >>      >>      > Regards
> >>      >>      > Bhaskar
> >>      >>      >
> >>      >>      >
> >>      >>      >
> >>      >>      >
> >>      >>      >
> >>      >>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor
> >>     <victtim@gmail.com <ma...@gmail.com>
> >>      >>     <mailto:victtim@gmail.com <ma...@gmail.com>>
> >>      >>      > <mailto:victtim@gmail.com <ma...@gmail.com>
> >>     <mailto:victtim@gmail.com <ma...@gmail.com>>>> wrote:
> >>      >>      >
> >>      >>      >     I had a similar problem.   I ended up solving by not
> >>      >> relying on
> >>      >>      >     checkpoints for recovery and instead re-read my input
> >>     sources
> >>      >>     (in my
> >>      >>      >     case a kafka topic) from the earliest offset and
> >>     rebuilding
> >>      >>     only the
> >>      >>      >     state I need.  I only need to care about the past 1
> >> to 2
> >>      >> days of
> >>      >>      >     state so can afford to drop anything older.   My
> >> recovery
> >>      >>     time went
> >>      >>      >     from over an hour for just the first checkpoint to
> >>     under 10
> >>      >>     minutes.
> >>      >>      >
> >>      >>      >     Tim
> >>      >>      >
> >>      >>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang
> >>     <myasuka@live.com <ma...@live.com>
> >>      >>     <mailto:myasuka@live.com <ma...@live.com>>
> >>      >>      >     <mailto:myasuka@live.com <ma...@live.com>
> >>     <mailto:myasuka@live.com <ma...@live.com>>>> wrote:
> >>      >>      >
> >>      >>      >         Hi Jeff
> >>      >>      >
> >>      >>      >          1. "after around 50GB of state, I stop being
> >> able to
> >>      >>     reliably
> >>      >>      >             take checkpoints or savepoints. "
> >>      >>      >             What is the exact reason that job cannot
> >> complete
> >>      >>      >             checkpoint? Expired before completing or
> >>     decline by
> >>      >> some
> >>      >>      >             tasks? The former one is manly caused by high
> >>      >>     back-pressure
> >>      >>      >             and the later one is mainly due to some
> >> internal
> >>      >> error.
> >>      >>      >          2. Have you checked what reason the remote task
> >>     manager
> >>      >>     is lost?
> >>      >>      >             If the remote task manager is not crashed, it
> >>     might
> >>      >>     be due
> >>      >>      >             to GC impact, I think you might need to check
> >>      >>     task-manager
> >>      >>      >             logs and GC logs.
> >>      >>      >
> >>      >>      >         Best
> >>      >>      >         Yun Tang
> >>      >>      >
> >>      >>
> >>
> >>
>  ------------------------------------------------------------------------
> >>      >>      >         *From:* Jeff Henrikson <jehenrik27@gmail.com
> >>     <ma...@gmail.com>
> >>      >>     <mailto:jehenrik27@gmail.com <ma...@gmail.com>>
> >>      >>      >         <mailto:jehenrik27@gmail.com
> >>     <ma...@gmail.com>
> >>      >> <mailto:jehenrik27@gmail.com <ma...@gmail.com>>>>
> >>      >>      >         *Sent:* Thursday, June 18, 2020 1:46
> >>      >>      >         *To:* user <user@flink.apache.org
> >>     <ma...@flink.apache.org>
> >>      >>     <mailto:user@flink.apache.org
> >>     <ma...@flink.apache.org>> <mailto:user@flink.apache.org
> >>     <ma...@flink.apache.org>
> >>      >>     <mailto:user@flink.apache.org
> >> <ma...@flink.apache.org>>>>
> >>      >>      >         *Subject:* Trouble with large state
> >>      >>      >         Hello Flink users,
> >>      >>      >
> >>      >>      >         I have an application of around 10 enrichment
> >>     joins.  All
> >>      >>     events
> >>      >>      >         are
> >>      >>      >         read from kafka and have event timestamps.  The
> >>     joins are
> >>      >>     built
> >>      >>      >         using
> >>      >>      >         .cogroup, with a global window, triggering on
> >> every 1
> >>      >>     event, plus a
> >>      >>      >         custom evictor that drops records once a newer
> >>     record
> >>      >> for the
> >>      >>      >         same ID
> >>      >>      >         has been processed.  Deletes are represented by
> >> empty
> >>      >>     events with
> >>      >>      >         timestamp and ID (tombstones). That way, we can
> >> drop
> >>      >>     records when
> >>      >>      >         business logic dictates, as opposed to when a
> >> maximum
> >>      >>     retention
> >>      >>      >         has been
> >>      >>      >         attained.  The application runs
> >>     RocksDBStateBackend, on
> >>      >>      >         Kubernetes on
> >>      >>      >         AWS with local SSDs.
> >>      >>      >
> >>      >>      >         Unit tests show that the joins produce expected
> >>      >> results.     On an
> >>      >>      >         8 node
> >>      >>      >         cluster, watermark output progress seems to
> >>     indicate I
> >>      >>     should be
> >>      >>      >         able to
> >>      >>      >         bootstrap my state of around 500GB in around 1
> >>     day.  I am
> >>      >>     able
> >>      >>      >         to save
> >>      >>      >         and restore savepoints for the first half an hour
> >>     of run
> >>      >>     time.
> >>      >>      >
> >>      >>      >         My current trouble is that after around 50GB of
> >>     state,
> >>      >> I stop
> >>      >>      >         being able
> >>      >>      >         to reliably take checkpoints or savepoints.  Some
> >>     time
> >>      >> after
> >>      >>      >         that, I
> >>      >>      >         start getting a variety of failures where the
> >> first
> >>      >>     suspicious
> >>      >>      >         log event
> >>      >>      >         is a generic cluster connectivity error, such as:
> >>      >>      >
> >>      >>      >               1) java.io.IOException: Connecting the
> >> channel
> >>      >> failed:
> >>      >>      >         Connecting
> >>      >>      >               to remote task manager +
> >>     '/10.67.7.101:38955 <http://10.67.7.101:38955>
> >>      >>     <http://10.67.7.101:38955>
> >>      >>      >         <http://10.67.7.101:38955>' has failed. This
> >>      >>      >               might indicate that the remote task
> >> manager has
> >>      >>     been lost.
> >>      >>      >
> >>      >>      >               2) org.apache.flink.runtime.io
> >>     <http://org.apache.flink.runtime.io>
> >>      >>     <http://org.apache.flink.runtime.io
> >.network.netty.exception
> >>      >>      >               .RemoteTransportException: Connection
> >>     unexpectedly
> >>      >>     closed
> >>      >>      >         by remote
> >>      >>      >               task manager 'null'. This might indicate
> >>     that the
> >>      >>     remote task
> >>      >>      >               manager was lost.
> >>      >>      >
> >>      >>      >               3) Association with remote system
> >>      >>      >               [akka.tcp://flink@10.67.6.66:34987
> >>     <http://flink@10.67.6.66:34987>
> >>      >>     <http://flink@10.67.6.66:34987>
> >>      >>      >         <http://flink@10.67.6.66:34987>] has failed,
> >>     address is
> >>      >> now
> >>      >>      >               gated for [50] ms. Reason: [Association
> >>     failed with
> >>      >>      >               [akka.tcp://flink@10.67.6.66:34987
> >>     <http://flink@10.67.6.66:34987>
> >>      >>     <http://flink@10.67.6.66:34987>
> >>      >>      >         <http://flink@10.67.6.66:34987>]] Caused by:
> >>      >>      >               [java.net <http://java.net>
> >>     <http://java.net>.NoRouteToHostException:
> >>      >>     No route to host]
> >>      >>      >
> >>      >>      >         I don't see any obvious out of memory errors on
> >> the
> >>      >>     TaskManager UI.
> >>      >>      >
> >>      >>      >         Adding nodes to the cluster does not seem to
> >>     increase the
> >>      >>     maximum
> >>      >>      >         savable state size.
> >>      >>      >
> >>      >>      >         I could enable HA, but for the time being I
> >> have been
> >>      >>     leaving it
> >>      >>      >         out to
> >>      >>      >         avoid the possibility of masking deterministic
> >>     faults.
> >>      >>      >
> >>      >>      >         Below are my configurations.
> >>      >>      >
> >>      >>      >         Thanks in advance for any advice.
> >>      >>      >
> >>      >>      >         Regards,
> >>      >>      >
> >>      >>      >
> >>      >>      >         Jeff Henrikson
> >>      >>      >
> >>      >>      >
> >>      >>      >
> >>      >>      >         Flink version: 1.10
> >>      >>      >
> >>      >>      >         Configuration set via code:
> >>      >>      >               parallelism=8
> >>      >>      >               maxParallelism=64
> >>      >>      >
> >> setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >>      >>      >
> >>      >> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> >>      >>      >               setTolerableCheckpointFailureNumber(1000)
> >>      >>      >               setMaxConcurrentCheckpoints(1)
> >>      >>      >
> >>      >>      >
> >>      >>
> >>
> >>
>  enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>
> >>
> >>      >>
> >>      >>      >               RocksDBStateBackend
> >>      >>      >
> >> setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
> >>      >>      >               setNumberOfTransferThreads(25)
> >>      >>      >               setDbStoragePath points to a local nvme SSD
> >>      >>      >
> >>      >>      >         Configuration in flink-conf.yaml:
> >>      >>      >
> >>      >>      >               jobmanager.rpc.address: localhost
> >>      >>      >               jobmanager.rpc.port: 6123
> >>      >>      >               jobmanager.heap.size: 28000m
> >>      >>      >               taskmanager.memory.process.size: 28000m
> >>      >>      >               taskmanager.memory.jvm-metaspace.size: 512m
> >>      >>      >               taskmanager.numberOfTaskSlots: 1
> >>      >>      >               parallelism.default: 1
> >>      >>      >               jobmanager.execution.failover-strategy:
> full
> >>      >>      >
> >>      >>      >               cluster.evenly-spread-out-slots: false
> >>      >>      >
> >>      >>      >               taskmanager.memory.network.fraction:
> >>     0.2           #
> >>      >>      >         default 0.1
> >>      >>      >
> >> taskmanager.memory.framework.off-heap.size: 2GB
> >>      >>      >               taskmanager.memory.task.off-heap.size: 2GB
> >>      >>      >     taskmanager.network.memory.buffers-per-channel: 32
> >>      >>     # default 2
> >>      >>      >               taskmanager.memory.managed.fraction: 0.4
> >>     #
> >>      >> docs say
> >>      >>      >         default 0.1, but something seems to set 0.4
> >>      >>      >               taskmanager.memory.task.off-heap.size:
> >>     2048MB      #
> >>      >>      >         default 128M
> >>      >>      >
> >>      >>      >               state.backend.fs.memory-threshold: 1048576
> >>      >>      >               state.backend.fs.write-buffer-size:
> 10240000
> >>      >>      >               state.backend.local-recovery: true
> >>      >>      >               state.backend.rocksdb.writebuffer.size:
> 64MB
> >>      >>      >               state.backend.rocksdb.writebuffer.count: 8
> >>      >>      >     state.backend.rocksdb.writebuffer.number-to-merge: 4
> >>      >>      >     state.backend.rocksdb.timer-service.factory: heap
> >>      >>      >               state.backend.rocksdb.block.cache-size:
> >>     64000000 #
> >>      >>     default 8MB
> >>      >>      >               state.backend.rocksdb.write-batch-size:
> >>     16000000 #
> >>      >>     default 2MB
> >>      >>      >
> >>      >>      >               web.checkpoints.history: 250
> >>      >>      >
> >>      >>
> >>
>

Re: Trouble with large state

Posted by Jeff Henrikson <je...@gmail.com>.
Bhaskar,

I think I am unstuck.  The performance numbers I sent after throttling 
were due to a one character error in business logic.  I think I now have 
something good enough to work with for now.  I will repost if I 
encounter further unexpected issues.

Adding application-level throttling ends up resolving both my symptom of 
slow/failing checkpoints, and also my symptom of crashes during long runs.

Many thanks!


Jeff


On 6/20/20 11:46 AM, Jeff Henrikson wrote:
> Bhaskar,
> 
>  > Glad to know some progress.
> 
> Yeah, some progress.  Yet overnight run didn't look as good as I hoped.
> 
> The throttling required to not crash during snapshots seems to be quite 
> different from the throttling required to crash not during snapshots. So 
> the lowest common denominator is quite a large performance penalty.
> 
> What's worse, the rate of input that makes the snapshot performance go 
> from good to bad seems to change significantly as the state size grows. 
> Here is checkpoint history from an overnight run.
> 
> Parameters:
> 
>      - 30 minutes minimum between snapshots
>      - incremental snapshot mode
>      - inputs throttled to 100 events per sec per input per slot,
>        which is around 1/4 of the unthrottled throughput
> 
> Checkpoint history:
> 
>      ID    Status    Acknowledged    Trigger Time    Latest 
> Acknowledgement    End to End Duration    State Size    Buffered During 
> Alignment
>      12    COMPLETED    304/304    8:52:22    10:37:18    1h 44m 55s    
> 60.5 GB    0 B
>      11    COMPLETED    304/304    6:47:03    8:22:19    1h 35m 16s    
> 53.3 GB    0 B
>      10    COMPLETED    304/304    5:01:20    6:17:00    1h 15m 39s    
> 41.0 GB    0 B
>      9    COMPLETED    304/304    3:47:43    4:31:19    43m 35s    34.1 
> GB    0 B
>      8    COMPLETED    304/304    2:40:58    3:17:42    36m 43s    27.8 
> GB    0 B
>      7    COMPLETED    304/304    1:39:15    2:10:57    31m 42s    23.1 
> GB    0 B
>      6    COMPLETED    304/304    0:58:02    1:09:13    11m 11s    17.4 
> GB    0 B
>      5    COMPLETED    304/304    0:23:27    0:28:01    4m 33s    14.3 
> GB    0 B
>      4    COMPLETED    304/304    23:52:29    23:53:26    56s    12.7 
> GB    0 B
>      3    COMPLETED    304/304    23:20:59    23:22:28    1m 29s    10.8 
> GB    0 B
>      2    COMPLETED    304/304    22:46:17    22:50:58    4m 40s    7.40 
> GB    0 B
> 
> As you can see, GB/minute varies drastically.  GB/minute also varies 
> drastically with full checkpoint mode.
> 
> I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the 
> checkpoint GB/minute getting so slow, it will crash soon.
> 
> I'm really wishing state.backend.async=false worked for 
> RocksDbStateBackend.
> 
> I'm also wondering if my throttler would improve if I just connected to 
> the REST api to ask if any checkpoint is in progress, and then paused 
> inputs accordingly.  Effectively state.backend.async=false via hacked 
> application code.
> 
>  > Where are you updating your state here? I
>  > couldn't find any flink managed state here.
> 
> The only updates to state I make are through the built-in 
> DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I 
> use .cogroup shows exactly two ways that .cogroup calls an 
> implementation of AppendingState.add.  I summarize those below.
> 
> The two AppendingState subclasses invoked are HeapListState and 
> HeapReducingState.  Neither have a support attribute on them, such as 
> MapState's @PublicEvolving.
> 
>  > I suggested updating the flink managed state using onTimer over an
>  > interval equal to the checkpoint interval.
> 
> So the onTimer method, with interval set to the checkpoint interval. 
> Interesting.
> 
> It looks like the closest subclass for my use case use would be either 
> KeyedCoProcessFunction.  Let me see if I understand concretely the idea:
> 
> 1) between checkpoints, read join input and write join output, by 
> loading any state reads from external state, but buffering all state 
> changes in memory in some kind of data structure.
> 
> 2) whenever a checkpoint arrived or the memory consumed by buffered 
> writes gets too big, flush the writes to state.
> 
> Is that the gist of the idea about .onTimer?
> 
> 
> Jeff
> 
> 
> 
> There are two paths from .coGroup to AppendingState.add
> 
>      path 1 of 2: .coGroup to HeapListState
> 
>          add:90, HeapListState {org.apache.flink.runtime.state.heap}
>          processElement:203, EvictingWindowOperator 
> {org.apache.flink.streaming.runtime.operators.windowing}
>          processElement:164, StreamOneInputProcessor 
> {org.apache.flink.streaming.runtime.io}
>          processInput:143, StreamOneInputProcessor 
> {org.apache.flink.streaming.runtime.io}
> 
> 
> org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement 
> 
> 
>                (windowAssigner is an instance of GlobalWindows)
> 
>                  @Override
>                  public void processElement(StreamRecord<IN> element) 
> throws Exception {
>                      final Collection<W> elementWindows = 
> windowAssigner.assignWindows(
>                              element.getValue(), element.getTimestamp(), 
> windowAssignerContext);
> 
>                      //if element is handled by none of assigned 
> elementWindows
>                      boolean isSkippedElement = true;
> 
>                      final K key = 
> this.<K>getKeyedStateBackend().getCurrentKey();
> 
>                      if (windowAssigner instanceof MergingWindowAssigner) {
>                  . . .
>                      } else {
>                          for (W window : elementWindows) {
> 
>                              // check if the window is already inactive
>                              if (isWindowLate(window)) {
>                                  continue;
>                              }
>                              isSkippedElement = false;
> 
>                              
> evictingWindowState.setCurrentNamespace(window);
>                              evictingWindowState.add(element);
> 
>          =>
> 
>              org.apache.flink.runtime.state.heap.HeapListState#add:
>                      @Override
>                      public void add(V value) {
>                          Preconditions.checkNotNull(value, "You cannot 
> add null to a ListState.");
> 
>                          final N namespace = currentNamespace;
> 
>                          final StateTable<K, N, List<V>> map = stateTable;
>                          List<V> list = map.get(namespace);
> 
>                          if (list == null) {
>                              list = new ArrayList<>();
>                              map.put(namespace, list);
>                          }
>                          list.add(value);
>                      }
> 
>      path 2 of 2: .coGroup to HeapReducingState
> 
>              add:95, HeapReducingState 
> {org.apache.flink.runtime.state.heap}
>              onElement:49, CountTrigger 
> {org.apache.flink.streaming.api.windowing.triggers}
>              onElement:898, WindowOperator$Context 
> {org.apache.flink.streaming.runtime.operators.windowing}
>              processElement:210, EvictingWindowOperator 
> {org.apache.flink.streaming.runtime.operators.windowing}
>              processElement:164, StreamOneInputProcessor 
> {org.apache.flink.streaming.runtime.io}
>              processInput:143, StreamOneInputProcessor 
> {org.apache.flink.streaming.runtime.io}
> 
>              @Override
>              public void processElement(StreamRecord<IN> element) throws 
> Exception {
>                  final Collection<W> elementWindows = 
> windowAssigner.assignWindows(
>                          element.getValue(), element.getTimestamp(), 
> windowAssignerContext);
> 
>                  //if element is handled by none of assigned elementWindows
>                  boolean isSkippedElement = true;
> 
>                  final K key = 
> this.<K>getKeyedStateBackend().getCurrentKey();
> 
>                  if (windowAssigner instanceof MergingWindowAssigner) {
>              . . .
>                  } else {
>                      for (W window : elementWindows) {
> 
>                          // check if the window is already inactive
>                          if (isWindowLate(window)) {
>                              continue;
>                          }
>                          isSkippedElement = false;
> 
>                          evictingWindowState.setCurrentNamespace(window);
>                          evictingWindowState.add(element);
> 
>                          triggerContext.key = key;
>                          triggerContext.window = window;
>                          evictorContext.key = key;
>                          evictorContext.window = window;
> 
>                          TriggerResult triggerResult = 
> triggerContext.onElement(element);
> 
>          =>
>                  public TriggerResult onElement(StreamRecord<IN> 
> element) throws Exception {
>                      return trigger.onElement(element.getValue(), 
> element.getTimestamp(), window, this);
> 
>          =>
> 
>              @Override
>              public TriggerResult onElement(Object element, long 
> timestamp, W window, TriggerContext ctx) throws Exception {
>                  ReducingState<Long> count = 
> ctx.getPartitionedState(stateDesc);
>                  count.add(1L);
> 
>          =>
> 
>              org.apache.flink.runtime.state.heap.HeapReducingState#add
>                    @Override
>                    public void add(V value) throws IOException {
> 
>                        if (value == null) {
> 
> 
> 
> On 6/19/20 8:22 PM, Vijay Bhaskar wrote:
>> Glad to know some progress. Where are you updating your state here? I 
>> couldn't find any flink managed state here.
>> I suggested updating the flink managed state using onTimer over an 
>> interval equal to the checkpoint interval.
>>
>> In your case since you do throttling, it helped to maintain the fixed 
>> rate per slot. Before the rate was sporadic.
>> It's definitely an IO bottleneck.
>>
>> So now you can think of decoupling stateless scanning and stateful joins.
>> For example you can keep a stateless scan as separate flink job and 
>> keep its output in some Kafka kind of store.
>>
>>  From there you start your stateful joins. This would help focussing 
>> on your stateful job in much better fashion
>>
>> Regards
>> Bhaskar
>>
>>
>>
>>
>> On Sat, Jun 20, 2020 at 4:49 AM Jeff Henrikson <jehenrik27@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>>     Bhaskar,
>>
>>     Based on your idea of limiting input to get better checkpoint 
>> behavior,
>>     I made a ProcessFunction that constraints to a number of events per
>>     second per slot per input.  I do need to do some stateless input
>>     scanning before joins.  The stateless part needs to be fast and 
>> does no
>>     impact snapshots.  So I inserted the throttling after the input
>>     preprocessing but before the stateful transformations.  There is a
>>     significant difference of snapshot throughput (often 5x or larger) 
>> when
>>     I change the throttle between 200 and 300 events per second (per slot
>>     per input).
>>
>>     Hope the throttling keeps being effective as I keep the job running
>>     longer.
>>
>>     Odd.  But likely a very effective way out of my problem.
>>
>>     I wonder what drives it . . .  Thread contention?  IOPS contention?
>>
>>     See ProcessFunction code below.
>>
>>     Many thanks!
>>
>>
>>     Jeff
>>
>>
>>
>>     import org.apache.flink.streaming.api.functions.ProcessFunction
>>     import org.apache.flink.util.Collector
>>
>>     // Set eventsPerSecMax to -1 to disable the throttle
>>     // TODO: Actual number of events can be slightly larger
>>     // TODO: Remove pause correlation with system clock
>>
>>     case class Throttler[T](eventsPerSecMax : Double) extends
>>     ProcessFunction[T,T] {
>>         var minutePrev = 0
>>         var numEvents = 0
>>         def minutes() = {
>>           val ms = System.currentTimeMillis()
>>           (ms / 1000 / 60).toInt
>>         }
>>         def increment() = {
>>           val m = minutes()
>>           if(m != minutePrev) {
>>             numEvents = 0
>>           }
>>           numEvents += 1
>>         }
>>         def eps() = {
>>           numEvents/60.0
>>         }
>>         override def processElement(x: T, ctx: ProcessFunction[T,
>>     T]#Context,
>>     out: Collector[T]): Unit = {
>>           increment()
>>           if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
>>             Thread.sleep(1000L)
>>           }
>>           out.collect(x)
>>         }
>>     }
>>
>>     On 6/19/20 9:16 AM, Jeff Henrikson wrote:
>>      > Bhaskar,
>>      >
>>      > Thank you for your thoughtful points.
>>      >
>>      >  > I want to discuss more on points (1) and (2)
>>      >  > If we take care of them  rest will be good
>>      >  >
>>      >  > Coming to (1)
>>      >  >
>>      >  > Please try to give reasonable checkpoint interval time for
>>     every job.
>>      >  > Minum checkpoint interval recommended by flink community is 3
>>     minutes
>>      >  > I thin you should give minimum 3 minutes checkpoint interval
>>     for all
>>      >
>>      > I have spent very little time testing with checkpoint intervals
>>     of under
>>      > 3 minutes.  I frequently test with intervals of 5 minutes and 
>> of 30
>>      > minutes.  I also test with checkpoint intervals such as 60
>>     minutes, and
>>      > never (manual only).  In terms of which exceptions get thrown, I
>>     don't
>>      > see much difference between 5/30/60, I don't see a lot of 
>> difference.
>>      >
>>      > Infinity (no checkpoint internal) seems to be an interesting 
>> value,
>>      > because before crashing, it seems to process around twice as much
>>     state
>>      > as with any finite checkpoint interval.  The largest savepoints I
>>     have
>>      > captured have been manually triggered using the /job/:jobid/stop
>>     REST
>>      > API.  I think it helps for the snapshot to be synchronous.
>>      >
>>      > One curiosity about the /job/:jobid/stop command is that from
>>     time of
>>      > the command, it often takes many minutes for the internal
>>     processing to
>>      > stop.
>>      >
>>      > Another curiosity about /job/:jobid/stop command is that sometimes
>>      > following a completed savepoint, the cluster goes back to running!
>>      >
>>      >  > Coming to (2)
>>      >  >
>>      >  > What's your input data rate?
>>      >
>>      > My application involves what I will call "main" events that are
>>     enriched
>>      > by "secondary" events.  While the secondary events have several
>>      > different input streams, data types, and join keys, I will
>>     estimate the
>>      > secondary events all together.  My estimate for input rate is as
>>     follows:
>>      >
>>      >      50M "main" events
>>      >      50 secondary events for each main event, for a
>>      >          total of around 2.5B input events
>>      >      8 nodes
>>      >      20 hours
>>      >
>>      > Combining these figures, we can estimate:
>>      >
>>      >      50000000*50/8/20/3600 = 4340 events/second/node
>>      >
>>      > I don't see how to act on your advice for (2).  Maybe your idea
>>     is that
>>      > during backfill/bootstrap, I artificially throttle the inputs 
>> to my
>>      > application?
>>      >
>>      > 100% of my application state is due to .cogroup, which manages a
>>      > HeapListState on its own.  I cannot think of any controls for
>>     changing
>>      > how .cogroup handles internal state per se.  I will paste below 
>> the
>>      > Flink code path that .cogroup uses to update its internal state
>>     when it
>>      > runs my application.
>>      >
>>      > The only control I can think of with .cogroup that indirectly
>>     impacts
>>      > internal state is delayed triggering.
>>      >
>>      > Currently I use a trigger on every event, which I understand
>>     creates a
>>      > suboptimal number of events.  I previously experimented with 
>> delayed
>>      > triggering, but I did not get good results.
>>      >
>>      > Just now I tried again ContinuousProcessingTimeTrigger of 30
>>     seconds,
>>      > with rocksdb.timer-service.factory: heap, and a 5 minute 
>> checkpoint
>>      > interval.  The first checkpoint failed, which has been rare when
>>     I use
>>      > all the same parameters except for triggering on every event.  
>> So it
>>      > looks worse not better.
>>      >
>>      > Thanks again,
>>      >
>>      >
>>      > Jeff Henrikson
>>      >
>>      >
>>      >
>>      >
>>      > On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
>>      >> Thanks for the reply.
>>      >> I want to discuss more on points (1) and (2)
>>      >> If we take care of them  rest will be good
>>      >>
>>      >> Coming to (1)
>>      >>
>>      >> Please try to give reasonable checkpoint interval time for every
>>     job.
>>      >> Minum checkpoint interval recommended by flink community is 3
>>     minutes
>>      >> I thin you should give minimum 3 minutes checkpoint interval 
>> for all
>>      >>
>>      >> Coming to (2)
>>      >>
>>      >> What's your input data rate?
>>      >> For example you are seeing data at 100 msg/sec, For each 
>> message if
>>      >> there is state changing and you are updating the state with
>>     RocksDB,
>>      >> it's going to
>>      >> create 100 rows in 1 second at RocksDb end, On the average if 50
>>      >> records have changed each second, even if you are using RocksDB
>>      >> differentialstate = true,
>>      >> there is no use. Because everytime 50% is new rows getting
>>     added. So
>>      >> the best bet is to update records with RocksDB only once in your
>>      >> checkpoint interval.
>>      >> Suppose your checkpoint interval is 5 minutes. If you update
>>     RocksDB
>>      >> state once in 5 minutes, then the rate at which new records
>>     added to
>>      >> RocksDB  will be 1 record/5min.
>>      >> Whereas in your original scenario, 30000 records added to
>>     rocksDB in 5
>>      >> min. You can save 1:30000 ratio of records in addition to 
>> RocksDB.
>>      >> Which will save a huge
>>      >> redundant size addition to RocksDB. Ultimately your  state is
>>     driven
>>      >> by your checkpoint interval. From the input source you will go
>>     back 5
>>      >> min back and read the state, similarly from RocksDB side
>>      >> also you can have a state update once in 5 min should work.
>>     Otherwise
>>      >> even if you add state there is no use.
>>      >>
>>      >> Regards
>>      >> Bhaskar
>>      >>
>>      >> Try to update your RocksDB state in an interval equal to the
>>      >> checkpoint interval. Otherwise in my case many times what's
>>     observed is
>>      >> state size grows unnecessarily.
>>      >>
>>      >> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson
>>     <jehenrik27@gmail.com <ma...@gmail.com>
>>      >> <mailto:jehenrik27@gmail.com <ma...@gmail.com>>> 
>> wrote:
>>      >>
>>      >>     Vijay,
>>      >>
>>      >>     Thanks for your thoughts.  Below are answers to your 
>> questions.
>>      >>
>>      >>       > 1. What's your checkpoint interval?
>>      >>
>>      >>     I have used many different checkpoint intervals, ranging 
>> from 5
>>      >> minutes
>>      >>     to never.  I usually setMinPasueBetweenCheckpoints to the 
>> same
>>      >> value as
>>      >>     the checkpoint interval.
>>      >>
>>      >>       > 2. How frequently are you updating the state into 
>> RocksDB?
>>      >>
>>      >>     My understanding is that for .cogroup:
>>      >>
>>      >>         - Triggers control communication outside the operator
>>      >>         - Evictors control cleanup of internal state
>>      >>         - Configurations like write buffer size control the
>>     frequency of
>>      >>     state change at the storage layer
>>      >>         - There is no control for how frequently the window state
>>      >>     updates at
>>      >>     the layer of the RocksDB api layer.
>>      >>
>>      >>     Thus, the state update whenever data is ingested.
>>      >>
>>      >>       > 3. How many task managers are you using?
>>      >>
>>      >>     Usually I have been running with one slot per taskmanager. 
>>     28GB of
>>      >>     usable ram on each node.
>>      >>
>>      >>       > 4. How much data each task manager handles while 
>> taking the
>>      >>     checkpoint?
>>      >>
>>      >>     Funny you should ask.  I would be okay with zero.
>>      >>
>>      >>     The application I am replacing has a latency of 36-48 hours,
>>     so if I
>>      >>     had
>>      >>     to fully stop processing to take every snapshot
>>     synchronously, it
>>      >> might
>>      >>     be seen as totally acceptable, especially for initial
>>     bootstrap.
>>      >> Also,
>>      >>     the velocity of running this backfill is approximately 
>> 115x real
>>      >>     time on
>>      >>     8 nodes, so the steady-state run may not exhibit the failure
>>     mode in
>>      >>     question at all.
>>      >>
>>      >>     It has come as some frustration to me that, in the case of
>>      >>     RocksDBStateBackend, the configuration key 
>> state.backend.async
>>      >>     effectively has no meaningful way to be false.
>>      >>
>>      >>     The only way I have found in the existing code to get a
>>     behavior like
>>      >>     synchronous snapshot is to POST to /jobs/<jobID>/stop with
>>      >> drain=false
>>      >>     and a URL.  This method of failing fast is the way that I
>>     discovered
>>      >>     that I needed to increase transfer threads from the default.
>>      >>
>>      >>     The reason I don't just run the whole backfill and then 
>> take one
>>      >>     snapshot is that even in the absence of checkpoints, a very
>>     similar
>>      >>     congestion seems to take the cluster down when I am say
>>     20-30% of the
>>      >>     way through my backfill.
>>      >>
>>      >>     Reloading from my largest feasible snapshot makes it
>>     possible to make
>>      >>     another snapshot a bit larger before crash, but not by much.
>>      >>
>>      >>     On first glance, the code change to allow
>>     RocksDBStateBackend into a
>>      >>     synchronous snapshots mode looks pretty easy.  Nevertheless,
>>     I was
>>      >>     hoping to do the initial launch of my application without
>>     needing to
>>      >>     modify the framework.
>>      >>
>>      >>     Regards,
>>      >>
>>      >>
>>      >>     Jeff Henrikson
>>      >>
>>      >>
>>      >>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
>>      >>      > For me this seems to be an IO bottleneck at your task
>>     manager.
>>      >>      > I have a couple of queries:
>>      >>      > 1. What's your checkpoint interval?
>>      >>      > 2. How frequently are you updating the state into RocksDB?
>>      >>      > 3. How many task managers are you using?
>>      >>      > 4. How much data each task manager handles while taking 
>> the
>>      >>     checkpoint?
>>      >>      >
>>      >>      > For points (3) and (4) , you should be very careful. I
>>     feel you
>>      >> are
>>      >>      > stuck at this.
>>      >>      > You try to scale vertically by increasing more CPU and
>>     memory for
>>      >>     each
>>      >>      > task manager.
>>      >>      > If not, try to scale horizontally so that each task
>>     manager IO
>>      >>     gets reduces
>>      >>      > Apart from that check is there any bottleneck with the 
>> file
>>      >> system.
>>      >>      >
>>      >>      > Regards
>>      >>      > Bhaskar
>>      >>      >
>>      >>      >
>>      >>      >
>>      >>      >
>>      >>      >
>>      >>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor
>>     <victtim@gmail.com <ma...@gmail.com>
>>      >>     <mailto:victtim@gmail.com <ma...@gmail.com>>
>>      >>      > <mailto:victtim@gmail.com <ma...@gmail.com>
>>     <mailto:victtim@gmail.com <ma...@gmail.com>>>> wrote:
>>      >>      >
>>      >>      >     I had a similar problem.   I ended up solving by not
>>      >> relying on
>>      >>      >     checkpoints for recovery and instead re-read my input
>>     sources
>>      >>     (in my
>>      >>      >     case a kafka topic) from the earliest offset and
>>     rebuilding
>>      >>     only the
>>      >>      >     state I need.  I only need to care about the past 1 
>> to 2
>>      >> days of
>>      >>      >     state so can afford to drop anything older.   My 
>> recovery
>>      >>     time went
>>      >>      >     from over an hour for just the first checkpoint to
>>     under 10
>>      >>     minutes.
>>      >>      >
>>      >>      >     Tim
>>      >>      >
>>      >>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang
>>     <myasuka@live.com <ma...@live.com>
>>      >>     <mailto:myasuka@live.com <ma...@live.com>>
>>      >>      >     <mailto:myasuka@live.com <ma...@live.com>
>>     <mailto:myasuka@live.com <ma...@live.com>>>> wrote:
>>      >>      >
>>      >>      >         Hi Jeff
>>      >>      >
>>      >>      >          1. "after around 50GB of state, I stop being 
>> able to
>>      >>     reliably
>>      >>      >             take checkpoints or savepoints. "
>>      >>      >             What is the exact reason that job cannot 
>> complete
>>      >>      >             checkpoint? Expired before completing or
>>     decline by
>>      >> some
>>      >>      >             tasks? The former one is manly caused by high
>>      >>     back-pressure
>>      >>      >             and the later one is mainly due to some 
>> internal
>>      >> error.
>>      >>      >          2. Have you checked what reason the remote task
>>     manager
>>      >>     is lost?
>>      >>      >             If the remote task manager is not crashed, it
>>     might
>>      >>     be due
>>      >>      >             to GC impact, I think you might need to check
>>      >>     task-manager
>>      >>      >             logs and GC logs.
>>      >>      >
>>      >>      >         Best
>>      >>      >         Yun Tang
>>      >>      >
>>      >>
>>      
>>  ------------------------------------------------------------------------
>>      >>      >         *From:* Jeff Henrikson <jehenrik27@gmail.com
>>     <ma...@gmail.com>
>>      >>     <mailto:jehenrik27@gmail.com <ma...@gmail.com>>
>>      >>      >         <mailto:jehenrik27@gmail.com
>>     <ma...@gmail.com>
>>      >> <mailto:jehenrik27@gmail.com <ma...@gmail.com>>>>
>>      >>      >         *Sent:* Thursday, June 18, 2020 1:46
>>      >>      >         *To:* user <user@flink.apache.org
>>     <ma...@flink.apache.org>
>>      >>     <mailto:user@flink.apache.org
>>     <ma...@flink.apache.org>> <mailto:user@flink.apache.org
>>     <ma...@flink.apache.org>
>>      >>     <mailto:user@flink.apache.org 
>> <ma...@flink.apache.org>>>>
>>      >>      >         *Subject:* Trouble with large state
>>      >>      >         Hello Flink users,
>>      >>      >
>>      >>      >         I have an application of around 10 enrichment
>>     joins.  All
>>      >>     events
>>      >>      >         are
>>      >>      >         read from kafka and have event timestamps.  The
>>     joins are
>>      >>     built
>>      >>      >         using
>>      >>      >         .cogroup, with a global window, triggering on 
>> every 1
>>      >>     event, plus a
>>      >>      >         custom evictor that drops records once a newer
>>     record
>>      >> for the
>>      >>      >         same ID
>>      >>      >         has been processed.  Deletes are represented by 
>> empty
>>      >>     events with
>>      >>      >         timestamp and ID (tombstones). That way, we can 
>> drop
>>      >>     records when
>>      >>      >         business logic dictates, as opposed to when a 
>> maximum
>>      >>     retention
>>      >>      >         has been
>>      >>      >         attained.  The application runs
>>     RocksDBStateBackend, on
>>      >>      >         Kubernetes on
>>      >>      >         AWS with local SSDs.
>>      >>      >
>>      >>      >         Unit tests show that the joins produce expected
>>      >> results.     On an
>>      >>      >         8 node
>>      >>      >         cluster, watermark output progress seems to
>>     indicate I
>>      >>     should be
>>      >>      >         able to
>>      >>      >         bootstrap my state of around 500GB in around 1
>>     day.  I am
>>      >>     able
>>      >>      >         to save
>>      >>      >         and restore savepoints for the first half an hour
>>     of run
>>      >>     time.
>>      >>      >
>>      >>      >         My current trouble is that after around 50GB of
>>     state,
>>      >> I stop
>>      >>      >         being able
>>      >>      >         to reliably take checkpoints or savepoints.  Some
>>     time
>>      >> after
>>      >>      >         that, I
>>      >>      >         start getting a variety of failures where the 
>> first
>>      >>     suspicious
>>      >>      >         log event
>>      >>      >         is a generic cluster connectivity error, such as:
>>      >>      >
>>      >>      >               1) java.io.IOException: Connecting the 
>> channel
>>      >> failed:
>>      >>      >         Connecting
>>      >>      >               to remote task manager +
>>     '/10.67.7.101:38955 <http://10.67.7.101:38955>
>>      >>     <http://10.67.7.101:38955>
>>      >>      >         <http://10.67.7.101:38955>' has failed. This
>>      >>      >               might indicate that the remote task 
>> manager has
>>      >>     been lost.
>>      >>      >
>>      >>      >               2) org.apache.flink.runtime.io
>>     <http://org.apache.flink.runtime.io>
>>      >>     <http://org.apache.flink.runtime.io>.network.netty.exception
>>      >>      >               .RemoteTransportException: Connection
>>     unexpectedly
>>      >>     closed
>>      >>      >         by remote
>>      >>      >               task manager 'null'. This might indicate
>>     that the
>>      >>     remote task
>>      >>      >               manager was lost.
>>      >>      >
>>      >>      >               3) Association with remote system
>>      >>      >               [akka.tcp://flink@10.67.6.66:34987
>>     <http://flink@10.67.6.66:34987>
>>      >>     <http://flink@10.67.6.66:34987>
>>      >>      >         <http://flink@10.67.6.66:34987>] has failed,
>>     address is
>>      >> now
>>      >>      >               gated for [50] ms. Reason: [Association
>>     failed with
>>      >>      >               [akka.tcp://flink@10.67.6.66:34987
>>     <http://flink@10.67.6.66:34987>
>>      >>     <http://flink@10.67.6.66:34987>
>>      >>      >         <http://flink@10.67.6.66:34987>]] Caused by:
>>      >>      >               [java.net <http://java.net>
>>     <http://java.net>.NoRouteToHostException:
>>      >>     No route to host]
>>      >>      >
>>      >>      >         I don't see any obvious out of memory errors on 
>> the
>>      >>     TaskManager UI.
>>      >>      >
>>      >>      >         Adding nodes to the cluster does not seem to
>>     increase the
>>      >>     maximum
>>      >>      >         savable state size.
>>      >>      >
>>      >>      >         I could enable HA, but for the time being I 
>> have been
>>      >>     leaving it
>>      >>      >         out to
>>      >>      >         avoid the possibility of masking deterministic
>>     faults.
>>      >>      >
>>      >>      >         Below are my configurations.
>>      >>      >
>>      >>      >         Thanks in advance for any advice.
>>      >>      >
>>      >>      >         Regards,
>>      >>      >
>>      >>      >
>>      >>      >         Jeff Henrikson
>>      >>      >
>>      >>      >
>>      >>      >
>>      >>      >         Flink version: 1.10
>>      >>      >
>>      >>      >         Configuration set via code:
>>      >>      >               parallelism=8
>>      >>      >               maxParallelism=64
>>      >>      >     
>> setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>      >>      >
>>      >> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>      >>      >               setTolerableCheckpointFailureNumber(1000)
>>      >>      >               setMaxConcurrentCheckpoints(1)
>>      >>      >
>>      >>      >
>>      >>
>>      
>>  enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) 
>>
>>      >>
>>      >>      >               RocksDBStateBackend
>>      >>      >     
>> setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>>      >>      >               setNumberOfTransferThreads(25)
>>      >>      >               setDbStoragePath points to a local nvme SSD
>>      >>      >
>>      >>      >         Configuration in flink-conf.yaml:
>>      >>      >
>>      >>      >               jobmanager.rpc.address: localhost
>>      >>      >               jobmanager.rpc.port: 6123
>>      >>      >               jobmanager.heap.size: 28000m
>>      >>      >               taskmanager.memory.process.size: 28000m
>>      >>      >               taskmanager.memory.jvm-metaspace.size: 512m
>>      >>      >               taskmanager.numberOfTaskSlots: 1
>>      >>      >               parallelism.default: 1
>>      >>      >               jobmanager.execution.failover-strategy: full
>>      >>      >
>>      >>      >               cluster.evenly-spread-out-slots: false
>>      >>      >
>>      >>      >               taskmanager.memory.network.fraction:
>>     0.2           #
>>      >>      >         default 0.1
>>      >>      >               
>> taskmanager.memory.framework.off-heap.size: 2GB
>>      >>      >               taskmanager.memory.task.off-heap.size: 2GB
>>      >>      >     taskmanager.network.memory.buffers-per-channel: 32
>>      >>     # default 2
>>      >>      >               taskmanager.memory.managed.fraction: 0.4 
>>     #
>>      >> docs say
>>      >>      >         default 0.1, but something seems to set 0.4
>>      >>      >               taskmanager.memory.task.off-heap.size:
>>     2048MB      #
>>      >>      >         default 128M
>>      >>      >
>>      >>      >               state.backend.fs.memory-threshold: 1048576
>>      >>      >               state.backend.fs.write-buffer-size: 10240000
>>      >>      >               state.backend.local-recovery: true
>>      >>      >               state.backend.rocksdb.writebuffer.size: 64MB
>>      >>      >               state.backend.rocksdb.writebuffer.count: 8
>>      >>      >     state.backend.rocksdb.writebuffer.number-to-merge: 4
>>      >>      >     state.backend.rocksdb.timer-service.factory: heap
>>      >>      >               state.backend.rocksdb.block.cache-size:
>>     64000000 #
>>      >>     default 8MB
>>      >>      >               state.backend.rocksdb.write-batch-size:
>>     16000000 #
>>      >>     default 2MB
>>      >>      >
>>      >>      >               web.checkpoints.history: 250
>>      >>      >
>>      >>
>>


Re: Trouble with large state

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

Sorry to jump in late.
After read the previous email. I have such assumptions, and please correct
me if I'm wrong:
- RocksDBStateBackend with incremental checkpoint
- at least once mode
- the parallelism for the stateful operator is 8
- checkpoint may take too long to complete
- has fix rate input by using throttler.

From the latest email, the checkpoint size grows from start to end, and the
e2e time grows also.

From my side.
e2e checkpoint time depends on the e2e snapshot time of all operators. the
e2e snapshot time of operator depends on the ${barrier_align_time} +
${sync-snapshot-time} + ${async-snapshot-time}.
For at least once mode, you can enable debug log to track the process of
barrier align time.

You can find out which step is the bottleneck, and track one task to find
out the reason.

maybe you could try:
1. use fullsnapshot of RocksDBStateBackend(disable incremental checkpoint)
and see what the e2e time of checkpoint will be -- This wants to verify
whether there is too many "increment change" between checkpoints.
2. place the times on Heap and RocksDB, whether this will affect the
checkpoint time -- The timer on heap will affect the sync-snapshot time
3. find out whether there is io/disk problem when snapshotting?
4. find out whether there is network problem when snapshotting?
5. does upload the state using multiple threads[1] help here


[1] https://issues.apache.org/jira/browse/FLINK-11008

Best,
Congxian


Jeff Henrikson <je...@gmail.com> 于2020年6月21日周日 上午2:46写道:

> Bhaskar,
>
>  > Glad to know some progress.
>
> Yeah, some progress.  Yet overnight run didn't look as good as I hoped.
>
> The throttling required to not crash during snapshots seems to be quite
> different from the throttling required to crash not during snapshots.
> So the lowest common denominator is quite a large performance penalty.
>
> What's worse, the rate of input that makes the snapshot performance go
> from good to bad seems to change significantly as the state size grows.
> Here is checkpoint history from an overnight run.
>
> Parameters:
>
>      - 30 minutes minimum between snapshots
>      - incremental snapshot mode
>      - inputs throttled to 100 events per sec per input per slot,
>        which is around 1/4 of the unthrottled throughput
>
> Checkpoint history:
>
>         ID      Status  Acknowledged    Trigger Time    Latest
> Acknowledgement  End to End
> Duration        State Size      Buffered During Alignment
>         12      COMPLETED       304/304 8:52:22 10:37:18        1h 44m
> 55s      60.5 GB 0 B
>         11      COMPLETED       304/304 6:47:03 8:22:19 1h 35m 16s
> 53.3 GB 0 B
>         10      COMPLETED       304/304 5:01:20 6:17:00 1h 15m 39s
> 41.0 GB 0 B
>         9       COMPLETED       304/304 3:47:43 4:31:19 43m 35s 34.1 GB 0 B
>         8       COMPLETED       304/304 2:40:58 3:17:42 36m 43s 27.8 GB 0 B
>         7       COMPLETED       304/304 1:39:15 2:10:57 31m 42s 23.1 GB 0 B
>         6       COMPLETED       304/304 0:58:02 1:09:13 11m 11s 17.4 GB 0 B
>         5       COMPLETED       304/304 0:23:27 0:28:01 4m 33s  14.3 GB 0 B
>         4       COMPLETED       304/304 23:52:29        23:53:26
> 56s     12.7 GB 0 B
>         3       COMPLETED       304/304 23:20:59        23:22:28        1m
> 29s  10.8 GB 0 B
>         2       COMPLETED       304/304 22:46:17        22:50:58        4m
> 40s  7.40 GB 0 B
>
> As you can see, GB/minute varies drastically.  GB/minute also varies
> drastically with full checkpoint mode.
>
> I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the
> checkpoint GB/minute getting so slow, it will crash soon.
>
> I'm really wishing state.backend.async=false worked for
> RocksDbStateBackend.
>
> I'm also wondering if my throttler would improve if I just connected to
> the REST api to ask if any checkpoint is in progress, and then paused
> inputs accordingly.  Effectively state.backend.async=false via hacked
> application code.
>
>  > Where are you updating your state here? I
>  > couldn't find any flink managed state here.
>
> The only updates to state I make are through the built-in
> DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I
> use .cogroup shows exactly two ways that .cogroup calls an
> implementation of AppendingState.add.  I summarize those below.
>
> The two AppendingState subclasses invoked are HeapListState and
> HeapReducingState.  Neither have a support attribute on them, such as
> MapState's @PublicEvolving.
>
>  > I suggested updating the flink managed state using onTimer over an
>  > interval equal to the checkpoint interval.
>
> So the onTimer method, with interval set to the checkpoint interval.
> Interesting.
>
> It looks like the closest subclass for my use case use would be either
> KeyedCoProcessFunction.  Let me see if I understand concretely the idea:
>
> 1) between checkpoints, read join input and write join output, by
> loading any state reads from external state, but buffering all state
> changes in memory in some kind of data structure.
>
> 2) whenever a checkpoint arrived or the memory consumed by buffered
> writes gets too big, flush the writes to state.
>
> Is that the gist of the idea about .onTimer?
>
>
> Jeff
>
>
>
> There are two paths from .coGroup to AppendingState.add
>
>      path 1 of 2: .coGroup to HeapListState
>
>          add:90, HeapListState {org.apache.flink.runtime.state.heap}
>          processElement:203, EvictingWindowOperator
> {org.apache.flink.streaming.runtime.operators.windowing}
>          processElement:164, StreamOneInputProcessor
> {org.apache.flink.streaming.runtime.io}
>          processInput:143, StreamOneInputProcessor
> {org.apache.flink.streaming.runtime.io}
>
>
>
> org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement
>
>                (windowAssigner is an instance of GlobalWindows)
>
>                 @Override
>                 public void processElement(StreamRecord<IN> element)
> throws Exception {
>                         final Collection<W> elementWindows =
> windowAssigner.assignWindows(
>                                         element.getValue(),
> element.getTimestamp(),
> windowAssignerContext);
>
>                         //if element is handled by none of assigned
> elementWindows
>                         boolean isSkippedElement = true;
>
>                         final K key =
> this.<K>getKeyedStateBackend().getCurrentKey();
>
>                         if (windowAssigner instanceof
> MergingWindowAssigner) {
>                  . . .
>                         } else {
>                                 for (W window : elementWindows) {
>
>                                         // check if the window is already
> inactive
>                                         if (isWindowLate(window)) {
>                                                 continue;
>                                         }
>                                         isSkippedElement = false;
>
>
> evictingWindowState.setCurrentNamespace(window);
>                                         evictingWindowState.add(element);
>
>          =>
>
>              org.apache.flink.runtime.state.heap.HeapListState#add:
>                         @Override
>                         public void add(V value) {
>                                 Preconditions.checkNotNull(value, "You
> cannot add
> null to a ListState.");
>
>                                 final N namespace = currentNamespace;
>
>                                 final StateTable<K, N, List<V>> map =
> stateTable;
>                                 List<V> list = map.get(namespace);
>
>                                 if (list == null) {
>                                         list = new ArrayList<>();
>                                         map.put(namespace, list);
>                                 }
>                                 list.add(value);
>                         }
>
>      path 2 of 2: .coGroup to HeapReducingState
>
>              add:95, HeapReducingState
> {org.apache.flink.runtime.state.heap}
>              onElement:49, CountTrigger
> {org.apache.flink.streaming.api.windowing.triggers}
>              onElement:898, WindowOperator$Context
> {org.apache.flink.streaming.runtime.operators.windowing}
>              processElement:210, EvictingWindowOperator
> {org.apache.flink.streaming.runtime.operators.windowing}
>              processElement:164, StreamOneInputProcessor
> {org.apache.flink.streaming.runtime.io}
>              processInput:143, StreamOneInputProcessor
> {org.apache.flink.streaming.runtime.io}
>
>                 @Override
>                 public void processElement(StreamRecord<IN> element)
> throws
> Exception {
>                         final Collection<W> elementWindows =
> windowAssigner.assignWindows(
>                                         element.getValue(),
> element.getTimestamp(),
> windowAssignerContext);
>
>                         //if element is handled by none of assigned
> elementWindows
>                         boolean isSkippedElement = true;
>
>                         final K key =
> this.<K>getKeyedStateBackend().getCurrentKey();
>
>                         if (windowAssigner instanceof
> MergingWindowAssigner) {
>              . . .
>                         } else {
>                                 for (W window : elementWindows) {
>
>                                         // check if the window is already
> inactive
>                                         if (isWindowLate(window)) {
>                                                 continue;
>                                         }
>                                         isSkippedElement = false;
>
>
> evictingWindowState.setCurrentNamespace(window);
>                                         evictingWindowState.add(element);
>
>                                         triggerContext.key = key;
>                                         triggerContext.window = window;
>                                         evictorContext.key = key;
>                                         evictorContext.window = window;
>
>                                         TriggerResult triggerResult =
> triggerContext.onElement(element);
>
>          =>
>                         public TriggerResult onElement(StreamRecord<IN>
> element)
> throws Exception {
>                                 return
> trigger.onElement(element.getValue(),
> element.getTimestamp(), window, this);
>
>          =>
>
>                 @Override
>                 public TriggerResult onElement(Object element, long
> timestamp,
> W window, TriggerContext ctx) throws Exception {
>                         ReducingState<Long> count =
> ctx.getPartitionedState(stateDesc);
>                         count.add(1L);
>
>          =>
>
>              org.apache.flink.runtime.state.heap.HeapReducingState#add
>                 @Override
>                 public void add(V value) throws IOException {
>
>                         if (value == null) {
>
>
>
> On 6/19/20 8:22 PM, Vijay Bhaskar wrote:
> > Glad to know some progress. Where are you updating your state here? I
> > couldn't find any flink managed state here.
> > I suggested updating the flink managed state using onTimer over an
> > interval equal to the checkpoint interval.
> >
> > In your case since you do throttling, it helped to maintain the fixed
> > rate per slot. Before the rate was sporadic.
> > It's definitely an IO bottleneck.
> >
> > So now you can think of decoupling stateless scanning and stateful joins.
> > For example you can keep a stateless scan as separate flink job and keep
> > its output in some Kafka kind of store.
> >
> >  From there you start your stateful joins. This would help focussing on
> > your stateful job in much better fashion
> >
> > Regards
> > Bhaskar
> >
> >
> >
> >
> > On Sat, Jun 20, 2020 at 4:49 AM Jeff Henrikson <jehenrik27@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Bhaskar,
> >
> >     Based on your idea of limiting input to get better checkpoint
> behavior,
> >     I made a ProcessFunction that constraints to a number of events per
> >     second per slot per input.  I do need to do some stateless input
> >     scanning before joins.  The stateless part needs to be fast and does
> no
> >     impact snapshots.  So I inserted the throttling after the input
> >     preprocessing but before the stateful transformations.  There is a
> >     significant difference of snapshot throughput (often 5x or larger)
> when
> >     I change the throttle between 200 and 300 events per second (per slot
> >     per input).
> >
> >     Hope the throttling keeps being effective as I keep the job running
> >     longer.
> >
> >     Odd.  But likely a very effective way out of my problem.
> >
> >     I wonder what drives it . . .  Thread contention?  IOPS contention?
> >
> >     See ProcessFunction code below.
> >
> >     Many thanks!
> >
> >
> >     Jeff
> >
> >
> >
> >     import org.apache.flink.streaming.api.functions.ProcessFunction
> >     import org.apache.flink.util.Collector
> >
> >     // Set eventsPerSecMax to -1 to disable the throttle
> >     // TODO: Actual number of events can be slightly larger
> >     // TODO: Remove pause correlation with system clock
> >
> >     case class Throttler[T](eventsPerSecMax : Double) extends
> >     ProcessFunction[T,T] {
> >         var minutePrev = 0
> >         var numEvents = 0
> >         def minutes() = {
> >           val ms = System.currentTimeMillis()
> >           (ms / 1000 / 60).toInt
> >         }
> >         def increment() = {
> >           val m = minutes()
> >           if(m != minutePrev) {
> >             numEvents = 0
> >           }
> >           numEvents += 1
> >         }
> >         def eps() = {
> >           numEvents/60.0
> >         }
> >         override def processElement(x: T, ctx: ProcessFunction[T,
> >     T]#Context,
> >     out: Collector[T]): Unit = {
> >           increment()
> >           if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
> >             Thread.sleep(1000L)
> >           }
> >           out.collect(x)
> >         }
> >     }
> >
> >     On 6/19/20 9:16 AM, Jeff Henrikson wrote:
> >      > Bhaskar,
> >      >
> >      > Thank you for your thoughtful points.
> >      >
> >      >  > I want to discuss more on points (1) and (2)
> >      >  > If we take care of them  rest will be good
> >      >  >
> >      >  > Coming to (1)
> >      >  >
> >      >  > Please try to give reasonable checkpoint interval time for
> >     every job.
> >      >  > Minum checkpoint interval recommended by flink community is 3
> >     minutes
> >      >  > I thin you should give minimum 3 minutes checkpoint interval
> >     for all
> >      >
> >      > I have spent very little time testing with checkpoint intervals
> >     of under
> >      > 3 minutes.  I frequently test with intervals of 5 minutes and of
> 30
> >      > minutes.  I also test with checkpoint intervals such as 60
> >     minutes, and
> >      > never (manual only).  In terms of which exceptions get thrown, I
> >     don't
> >      > see much difference between 5/30/60, I don't see a lot of
> difference.
> >      >
> >      > Infinity (no checkpoint internal) seems to be an interesting
> value,
> >      > because before crashing, it seems to process around twice as much
> >     state
> >      > as with any finite checkpoint interval.  The largest savepoints I
> >     have
> >      > captured have been manually triggered using the /job/:jobid/stop
> >     REST
> >      > API.  I think it helps for the snapshot to be synchronous.
> >      >
> >      > One curiosity about the /job/:jobid/stop command is that from
> >     time of
> >      > the command, it often takes many minutes for the internal
> >     processing to
> >      > stop.
> >      >
> >      > Another curiosity about /job/:jobid/stop command is that sometimes
> >      > following a completed savepoint, the cluster goes back to running!
> >      >
> >      >  > Coming to (2)
> >      >  >
> >      >  > What's your input data rate?
> >      >
> >      > My application involves what I will call "main" events that are
> >     enriched
> >      > by "secondary" events.  While the secondary events have several
> >      > different input streams, data types, and join keys, I will
> >     estimate the
> >      > secondary events all together.  My estimate for input rate is as
> >     follows:
> >      >
> >      >      50M "main" events
> >      >      50 secondary events for each main event, for a
> >      >          total of around 2.5B input events
> >      >      8 nodes
> >      >      20 hours
> >      >
> >      > Combining these figures, we can estimate:
> >      >
> >      >      50000000*50/8/20/3600 = 4340 events/second/node
> >      >
> >      > I don't see how to act on your advice for (2).  Maybe your idea
> >     is that
> >      > during backfill/bootstrap, I artificially throttle the inputs to
> my
> >      > application?
> >      >
> >      > 100% of my application state is due to .cogroup, which manages a
> >      > HeapListState on its own.  I cannot think of any controls for
> >     changing
> >      > how .cogroup handles internal state per se.  I will paste below
> the
> >      > Flink code path that .cogroup uses to update its internal state
> >     when it
> >      > runs my application.
> >      >
> >      > The only control I can think of with .cogroup that indirectly
> >     impacts
> >      > internal state is delayed triggering.
> >      >
> >      > Currently I use a trigger on every event, which I understand
> >     creates a
> >      > suboptimal number of events.  I previously experimented with
> delayed
> >      > triggering, but I did not get good results.
> >      >
> >      > Just now I tried again ContinuousProcessingTimeTrigger of 30
> >     seconds,
> >      > with rocksdb.timer-service.factory: heap, and a 5 minute
> checkpoint
> >      > interval.  The first checkpoint failed, which has been rare when
> >     I use
> >      > all the same parameters except for triggering on every event.  So
> it
> >      > looks worse not better.
> >      >
> >      > Thanks again,
> >      >
> >      >
> >      > Jeff Henrikson
> >      >
> >      >
> >      >
> >      >
> >      > On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
> >      >> Thanks for the reply.
> >      >> I want to discuss more on points (1) and (2)
> >      >> If we take care of them  rest will be good
> >      >>
> >      >> Coming to (1)
> >      >>
> >      >> Please try to give reasonable checkpoint interval time for every
> >     job.
> >      >> Minum checkpoint interval recommended by flink community is 3
> >     minutes
> >      >> I thin you should give minimum 3 minutes checkpoint interval for
> all
> >      >>
> >      >> Coming to (2)
> >      >>
> >      >> What's your input data rate?
> >      >> For example you are seeing data at 100 msg/sec, For each message
> if
> >      >> there is state changing and you are updating the state with
> >     RocksDB,
> >      >> it's going to
> >      >> create 100 rows in 1 second at RocksDb end, On the average if 50
> >      >> records have changed each second, even if you are using RocksDB
> >      >> differentialstate = true,
> >      >> there is no use. Because everytime 50% is new rows getting
> >     added. So
> >      >> the best bet is to update records with RocksDB only once in your
> >      >> checkpoint interval.
> >      >> Suppose your checkpoint interval is 5 minutes. If you update
> >     RocksDB
> >      >> state once in 5 minutes, then the rate at which new records
> >     added to
> >      >> RocksDB  will be 1 record/5min.
> >      >> Whereas in your original scenario, 30000 records added to
> >     rocksDB in 5
> >      >> min. You can save 1:30000 ratio of records in addition to
> RocksDB.
> >      >> Which will save a huge
> >      >> redundant size addition to RocksDB. Ultimately your  state is
> >     driven
> >      >> by your checkpoint interval. From the input source you will go
> >     back 5
> >      >> min back and read the state, similarly from RocksDB side
> >      >> also you can have a state update once in 5 min should work.
> >     Otherwise
> >      >> even if you add state there is no use.
> >      >>
> >      >> Regards
> >      >> Bhaskar
> >      >>
> >      >> Try to update your RocksDB state in an interval equal to the
> >      >> checkpoint interval. Otherwise in my case many times what's
> >     observed is
> >      >> state size grows unnecessarily.
> >      >>
> >      >> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson
> >     <jehenrik27@gmail.com <ma...@gmail.com>
> >      >> <mailto:jehenrik27@gmail.com <ma...@gmail.com>>>
> wrote:
> >      >>
> >      >>     Vijay,
> >      >>
> >      >>     Thanks for your thoughts.  Below are answers to your
> questions.
> >      >>
> >      >>       > 1. What's your checkpoint interval?
> >      >>
> >      >>     I have used many different checkpoint intervals, ranging
> from 5
> >      >> minutes
> >      >>     to never.  I usually setMinPasueBetweenCheckpoints to the
> same
> >      >> value as
> >      >>     the checkpoint interval.
> >      >>
> >      >>       > 2. How frequently are you updating the state into
> RocksDB?
> >      >>
> >      >>     My understanding is that for .cogroup:
> >      >>
> >      >>         - Triggers control communication outside the operator
> >      >>         - Evictors control cleanup of internal state
> >      >>         - Configurations like write buffer size control the
> >     frequency of
> >      >>     state change at the storage layer
> >      >>         - There is no control for how frequently the window state
> >      >>     updates at
> >      >>     the layer of the RocksDB api layer.
> >      >>
> >      >>     Thus, the state update whenever data is ingested.
> >      >>
> >      >>       > 3. How many task managers are you using?
> >      >>
> >      >>     Usually I have been running with one slot per taskmanager.
> >     28GB of
> >      >>     usable ram on each node.
> >      >>
> >      >>       > 4. How much data each task manager handles while taking
> the
> >      >>     checkpoint?
> >      >>
> >      >>     Funny you should ask.  I would be okay with zero.
> >      >>
> >      >>     The application I am replacing has a latency of 36-48 hours,
> >     so if I
> >      >>     had
> >      >>     to fully stop processing to take every snapshot
> >     synchronously, it
> >      >> might
> >      >>     be seen as totally acceptable, especially for initial
> >     bootstrap.
> >      >> Also,
> >      >>     the velocity of running this backfill is approximately 115x
> real
> >      >>     time on
> >      >>     8 nodes, so the steady-state run may not exhibit the failure
> >     mode in
> >      >>     question at all.
> >      >>
> >      >>     It has come as some frustration to me that, in the case of
> >      >>     RocksDBStateBackend, the configuration key
> state.backend.async
> >      >>     effectively has no meaningful way to be false.
> >      >>
> >      >>     The only way I have found in the existing code to get a
> >     behavior like
> >      >>     synchronous snapshot is to POST to /jobs/<jobID>/stop with
> >      >> drain=false
> >      >>     and a URL.  This method of failing fast is the way that I
> >     discovered
> >      >>     that I needed to increase transfer threads from the default.
> >      >>
> >      >>     The reason I don't just run the whole backfill and then take
> one
> >      >>     snapshot is that even in the absence of checkpoints, a very
> >     similar
> >      >>     congestion seems to take the cluster down when I am say
> >     20-30% of the
> >      >>     way through my backfill.
> >      >>
> >      >>     Reloading from my largest feasible snapshot makes it
> >     possible to make
> >      >>     another snapshot a bit larger before crash, but not by much.
> >      >>
> >      >>     On first glance, the code change to allow
> >     RocksDBStateBackend into a
> >      >>     synchronous snapshots mode looks pretty easy.  Nevertheless,
> >     I was
> >      >>     hoping to do the initial launch of my application without
> >     needing to
> >      >>     modify the framework.
> >      >>
> >      >>     Regards,
> >      >>
> >      >>
> >      >>     Jeff Henrikson
> >      >>
> >      >>
> >      >>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
> >      >>      > For me this seems to be an IO bottleneck at your task
> >     manager.
> >      >>      > I have a couple of queries:
> >      >>      > 1. What's your checkpoint interval?
> >      >>      > 2. How frequently are you updating the state into RocksDB?
> >      >>      > 3. How many task managers are you using?
> >      >>      > 4. How much data each task manager handles while taking
> the
> >      >>     checkpoint?
> >      >>      >
> >      >>      > For points (3) and (4) , you should be very careful. I
> >     feel you
> >      >> are
> >      >>      > stuck at this.
> >      >>      > You try to scale vertically by increasing more CPU and
> >     memory for
> >      >>     each
> >      >>      > task manager.
> >      >>      > If not, try to scale horizontally so that each task
> >     manager IO
> >      >>     gets reduces
> >      >>      > Apart from that check is there any bottleneck with the
> file
> >      >> system.
> >      >>      >
> >      >>      > Regards
> >      >>      > Bhaskar
> >      >>      >
> >      >>      >
> >      >>      >
> >      >>      >
> >      >>      >
> >      >>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor
> >     <victtim@gmail.com <ma...@gmail.com>
> >      >>     <mailto:victtim@gmail.com <ma...@gmail.com>>
> >      >>      > <mailto:victtim@gmail.com <ma...@gmail.com>
> >     <mailto:victtim@gmail.com <ma...@gmail.com>>>> wrote:
> >      >>      >
> >      >>      >     I had a similar problem.   I ended up solving by not
> >      >> relying on
> >      >>      >     checkpoints for recovery and instead re-read my input
> >     sources
> >      >>     (in my
> >      >>      >     case a kafka topic) from the earliest offset and
> >     rebuilding
> >      >>     only the
> >      >>      >     state I need.  I only need to care about the past 1
> to 2
> >      >> days of
> >      >>      >     state so can afford to drop anything older.   My
> recovery
> >      >>     time went
> >      >>      >     from over an hour for just the first checkpoint to
> >     under 10
> >      >>     minutes.
> >      >>      >
> >      >>      >     Tim
> >      >>      >
> >      >>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang
> >     <myasuka@live.com <ma...@live.com>
> >      >>     <mailto:myasuka@live.com <ma...@live.com>>
> >      >>      >     <mailto:myasuka@live.com <ma...@live.com>
> >     <mailto:myasuka@live.com <ma...@live.com>>>> wrote:
> >      >>      >
> >      >>      >         Hi Jeff
> >      >>      >
> >      >>      >          1. "after around 50GB of state, I stop being
> able to
> >      >>     reliably
> >      >>      >             take checkpoints or savepoints. "
> >      >>      >             What is the exact reason that job cannot
> complete
> >      >>      >             checkpoint? Expired before completing or
> >     decline by
> >      >> some
> >      >>      >             tasks? The former one is manly caused by high
> >      >>     back-pressure
> >      >>      >             and the later one is mainly due to some
> internal
> >      >> error.
> >      >>      >          2. Have you checked what reason the remote task
> >     manager
> >      >>     is lost?
> >      >>      >             If the remote task manager is not crashed, it
> >     might
> >      >>     be due
> >      >>      >             to GC impact, I think you might need to check
> >      >>     task-manager
> >      >>      >             logs and GC logs.
> >      >>      >
> >      >>      >         Best
> >      >>      >         Yun Tang
> >      >>      >
> >      >>
> >
>  ------------------------------------------------------------------------
> >      >>      >         *From:* Jeff Henrikson <jehenrik27@gmail.com
> >     <ma...@gmail.com>
> >      >>     <mailto:jehenrik27@gmail.com <ma...@gmail.com>>
> >      >>      >         <mailto:jehenrik27@gmail.com
> >     <ma...@gmail.com>
> >      >> <mailto:jehenrik27@gmail.com <ma...@gmail.com>>>>
> >      >>      >         *Sent:* Thursday, June 18, 2020 1:46
> >      >>      >         *To:* user <user@flink.apache.org
> >     <ma...@flink.apache.org>
> >      >>     <mailto:user@flink.apache.org
> >     <ma...@flink.apache.org>> <mailto:user@flink.apache.org
> >     <ma...@flink.apache.org>
> >      >>     <mailto:user@flink.apache.org <mailto:user@flink.apache.org
> >>>>
> >      >>      >         *Subject:* Trouble with large state
> >      >>      >         Hello Flink users,
> >      >>      >
> >      >>      >         I have an application of around 10 enrichment
> >     joins.  All
> >      >>     events
> >      >>      >         are
> >      >>      >         read from kafka and have event timestamps.  The
> >     joins are
> >      >>     built
> >      >>      >         using
> >      >>      >         .cogroup, with a global window, triggering on
> every 1
> >      >>     event, plus a
> >      >>      >         custom evictor that drops records once a newer
> >     record
> >      >> for the
> >      >>      >         same ID
> >      >>      >         has been processed.  Deletes are represented by
> empty
> >      >>     events with
> >      >>      >         timestamp and ID (tombstones). That way, we can
> drop
> >      >>     records when
> >      >>      >         business logic dictates, as opposed to when a
> maximum
> >      >>     retention
> >      >>      >         has been
> >      >>      >         attained.  The application runs
> >     RocksDBStateBackend, on
> >      >>      >         Kubernetes on
> >      >>      >         AWS with local SSDs.
> >      >>      >
> >      >>      >         Unit tests show that the joins produce expected
> >      >> results.     On an
> >      >>      >         8 node
> >      >>      >         cluster, watermark output progress seems to
> >     indicate I
> >      >>     should be
> >      >>      >         able to
> >      >>      >         bootstrap my state of around 500GB in around 1
> >     day.  I am
> >      >>     able
> >      >>      >         to save
> >      >>      >         and restore savepoints for the first half an hour
> >     of run
> >      >>     time.
> >      >>      >
> >      >>      >         My current trouble is that after around 50GB of
> >     state,
> >      >> I stop
> >      >>      >         being able
> >      >>      >         to reliably take checkpoints or savepoints.  Some
> >     time
> >      >> after
> >      >>      >         that, I
> >      >>      >         start getting a variety of failures where the
> first
> >      >>     suspicious
> >      >>      >         log event
> >      >>      >         is a generic cluster connectivity error, such as:
> >      >>      >
> >      >>      >               1) java.io.IOException: Connecting the
> channel
> >      >> failed:
> >      >>      >         Connecting
> >      >>      >               to remote task manager +
> >     '/10.67.7.101:38955 <http://10.67.7.101:38955>
> >      >>     <http://10.67.7.101:38955>
> >      >>      >         <http://10.67.7.101:38955>' has failed. This
> >      >>      >               might indicate that the remote task manager
> has
> >      >>     been lost.
> >      >>      >
> >      >>      >               2) org.apache.flink.runtime.io
> >     <http://org.apache.flink.runtime.io>
> >      >>     <http://org.apache.flink.runtime.io>.network.netty.exception
> >      >>      >               .RemoteTransportException: Connection
> >     unexpectedly
> >      >>     closed
> >      >>      >         by remote
> >      >>      >               task manager 'null'. This might indicate
> >     that the
> >      >>     remote task
> >      >>      >               manager was lost.
> >      >>      >
> >      >>      >               3) Association with remote system
> >      >>      >               [akka.tcp://flink@10.67.6.66:34987
> >     <http://flink@10.67.6.66:34987>
> >      >>     <http://flink@10.67.6.66:34987>
> >      >>      >         <http://flink@10.67.6.66:34987>] has failed,
> >     address is
> >      >> now
> >      >>      >               gated for [50] ms. Reason: [Association
> >     failed with
> >      >>      >               [akka.tcp://flink@10.67.6.66:34987
> >     <http://flink@10.67.6.66:34987>
> >      >>     <http://flink@10.67.6.66:34987>
> >      >>      >         <http://flink@10.67.6.66:34987>]] Caused by:
> >      >>      >               [java.net <http://java.net>
> >     <http://java.net>.NoRouteToHostException:
> >      >>     No route to host]
> >      >>      >
> >      >>      >         I don't see any obvious out of memory errors on
> the
> >      >>     TaskManager UI.
> >      >>      >
> >      >>      >         Adding nodes to the cluster does not seem to
> >     increase the
> >      >>     maximum
> >      >>      >         savable state size.
> >      >>      >
> >      >>      >         I could enable HA, but for the time being I have
> been
> >      >>     leaving it
> >      >>      >         out to
> >      >>      >         avoid the possibility of masking deterministic
> >     faults.
> >      >>      >
> >      >>      >         Below are my configurations.
> >      >>      >
> >      >>      >         Thanks in advance for any advice.
> >      >>      >
> >      >>      >         Regards,
> >      >>      >
> >      >>      >
> >      >>      >         Jeff Henrikson
> >      >>      >
> >      >>      >
> >      >>      >
> >      >>      >         Flink version: 1.10
> >      >>      >
> >      >>      >         Configuration set via code:
> >      >>      >               parallelism=8
> >      >>      >               maxParallelism=64
> >      >>      >
> setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >      >>      >
> >      >> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> >      >>      >               setTolerableCheckpointFailureNumber(1000)
> >      >>      >               setMaxConcurrentCheckpoints(1)
> >      >>      >
> >      >>      >
> >      >>
> >
>  enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> >      >>
> >      >>      >               RocksDBStateBackend
> >      >>      >
> >     setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
> >      >>      >               setNumberOfTransferThreads(25)
> >      >>      >               setDbStoragePath points to a local nvme SSD
> >      >>      >
> >      >>      >         Configuration in flink-conf.yaml:
> >      >>      >
> >      >>      >               jobmanager.rpc.address: localhost
> >      >>      >               jobmanager.rpc.port: 6123
> >      >>      >               jobmanager.heap.size: 28000m
> >      >>      >               taskmanager.memory.process.size: 28000m
> >      >>      >               taskmanager.memory.jvm-metaspace.size: 512m
> >      >>      >               taskmanager.numberOfTaskSlots: 1
> >      >>      >               parallelism.default: 1
> >      >>      >               jobmanager.execution.failover-strategy: full
> >      >>      >
> >      >>      >               cluster.evenly-spread-out-slots: false
> >      >>      >
> >      >>      >               taskmanager.memory.network.fraction:
> >     0.2           #
> >      >>      >         default 0.1
> >      >>      >               taskmanager.memory.framework.off-heap.size:
> 2GB
> >      >>      >               taskmanager.memory.task.off-heap.size: 2GB
> >      >>      >
> >     taskmanager.network.memory.buffers-per-channel: 32
> >      >>     # default 2
> >      >>      >               taskmanager.memory.managed.fraction: 0.4
>     #
> >      >> docs say
> >      >>      >         default 0.1, but something seems to set 0.4
> >      >>      >               taskmanager.memory.task.off-heap.size:
> >     2048MB      #
> >      >>      >         default 128M
> >      >>      >
> >      >>      >               state.backend.fs.memory-threshold: 1048576
> >      >>      >               state.backend.fs.write-buffer-size: 10240000
> >      >>      >               state.backend.local-recovery: true
> >      >>      >               state.backend.rocksdb.writebuffer.size: 64MB
> >      >>      >               state.backend.rocksdb.writebuffer.count: 8
> >      >>      >
> >     state.backend.rocksdb.writebuffer.number-to-merge: 4
> >      >>      >
> >     state.backend.rocksdb.timer-service.factory: heap
> >      >>      >               state.backend.rocksdb.block.cache-size:
> >     64000000 #
> >      >>     default 8MB
> >      >>      >               state.backend.rocksdb.write-batch-size:
> >     16000000 #
> >      >>     default 2MB
> >      >>      >
> >      >>      >               web.checkpoints.history: 250
> >      >>      >
> >      >>
> >
>
>

Re: Trouble with large state

Posted by Jeff Henrikson <je...@gmail.com>.
Bhaskar,

 > Glad to know some progress.

Yeah, some progress.  Yet overnight run didn't look as good as I hoped.

The throttling required to not crash during snapshots seems to be quite 
different from the throttling required to crash not during snapshots. 
So the lowest common denominator is quite a large performance penalty.

What's worse, the rate of input that makes the snapshot performance go 
from good to bad seems to change significantly as the state size grows. 
Here is checkpoint history from an overnight run.

Parameters:

     - 30 minutes minimum between snapshots
     - incremental snapshot mode
     - inputs throttled to 100 events per sec per input per slot,
       which is around 1/4 of the unthrottled throughput

Checkpoint history:

	ID	Status	Acknowledged	Trigger Time	Latest Acknowledgement	End to End 
Duration	State Size	Buffered During Alignment
	12	COMPLETED	304/304	8:52:22	10:37:18	1h 44m 55s	60.5 GB	0 B
	11	COMPLETED	304/304	6:47:03	8:22:19	1h 35m 16s	53.3 GB	0 B
	10	COMPLETED	304/304	5:01:20	6:17:00	1h 15m 39s	41.0 GB	0 B
	9	COMPLETED	304/304	3:47:43	4:31:19	43m 35s	34.1 GB	0 B
	8	COMPLETED	304/304	2:40:58	3:17:42	36m 43s	27.8 GB	0 B
	7	COMPLETED	304/304	1:39:15	2:10:57	31m 42s	23.1 GB	0 B
	6	COMPLETED	304/304	0:58:02	1:09:13	11m 11s	17.4 GB	0 B
	5	COMPLETED	304/304	0:23:27	0:28:01	4m 33s	14.3 GB	0 B
	4	COMPLETED	304/304	23:52:29	23:53:26	56s	12.7 GB	0 B
	3	COMPLETED	304/304	23:20:59	23:22:28	1m 29s	10.8 GB	0 B
	2	COMPLETED	304/304	22:46:17	22:50:58	4m 40s	7.40 GB	0 B

As you can see, GB/minute varies drastically.  GB/minute also varies 
drastically with full checkpoint mode.

I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the 
checkpoint GB/minute getting so slow, it will crash soon.

I'm really wishing state.backend.async=false worked for RocksDbStateBackend.

I'm also wondering if my throttler would improve if I just connected to 
the REST api to ask if any checkpoint is in progress, and then paused 
inputs accordingly.  Effectively state.backend.async=false via hacked 
application code.

 > Where are you updating your state here? I
 > couldn't find any flink managed state here.

The only updates to state I make are through the built-in 
DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I 
use .cogroup shows exactly two ways that .cogroup calls an 
implementation of AppendingState.add.  I summarize those below.

The two AppendingState subclasses invoked are HeapListState and 
HeapReducingState.  Neither have a support attribute on them, such as 
MapState's @PublicEvolving.

 > I suggested updating the flink managed state using onTimer over an
 > interval equal to the checkpoint interval.

So the onTimer method, with interval set to the checkpoint interval. 
Interesting.

It looks like the closest subclass for my use case use would be either 
KeyedCoProcessFunction.  Let me see if I understand concretely the idea:

1) between checkpoints, read join input and write join output, by 
loading any state reads from external state, but buffering all state 
changes in memory in some kind of data structure.

2) whenever a checkpoint arrived or the memory consumed by buffered 
writes gets too big, flush the writes to state.

Is that the gist of the idea about .onTimer?


Jeff



There are two paths from .coGroup to AppendingState.add

     path 1 of 2: .coGroup to HeapListState

         add:90, HeapListState {org.apache.flink.runtime.state.heap}
         processElement:203, EvictingWindowOperator 
{org.apache.flink.streaming.runtime.operators.windowing}
         processElement:164, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}
         processInput:143, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}

 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement

               (windowAssigner is an instance of GlobalWindows)

             	@Override
             	public void processElement(StreamRecord<IN> element) 
throws Exception {
             		final Collection<W> elementWindows = 
windowAssigner.assignWindows(
             				element.getValue(), element.getTimestamp(), 
windowAssignerContext);

             		//if element is handled by none of assigned elementWindows
             		boolean isSkippedElement = true;

             		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

             		if (windowAssigner instanceof MergingWindowAssigner) {
                 . . .
             		} else {
             			for (W window : elementWindows) {

             				// check if the window is already inactive
             				if (isWindowLate(window)) {
             					continue;
             				}
             				isSkippedElement = false;

             				evictingWindowState.setCurrentNamespace(window);
             				evictingWindowState.add(element);

         =>

             org.apache.flink.runtime.state.heap.HeapListState#add:
                 	@Override
                 	public void add(V value) {
                 		Preconditions.checkNotNull(value, "You cannot add 
null to a ListState.");

                 		final N namespace = currentNamespace;

                 		final StateTable<K, N, List<V>> map = stateTable;
                 		List<V> list = map.get(namespace);

                 		if (list == null) {
                 			list = new ArrayList<>();
                 			map.put(namespace, list);
                 		}
                 		list.add(value);
                 	}

     path 2 of 2: .coGroup to HeapReducingState

             add:95, HeapReducingState {org.apache.flink.runtime.state.heap}
             onElement:49, CountTrigger 
{org.apache.flink.streaming.api.windowing.triggers}
             onElement:898, WindowOperator$Context 
{org.apache.flink.streaming.runtime.operators.windowing}
             processElement:210, EvictingWindowOperator 
{org.apache.flink.streaming.runtime.operators.windowing}
             processElement:164, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}
             processInput:143, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}

         	@Override
         	public void processElement(StreamRecord<IN> element) throws 
Exception {
         		final Collection<W> elementWindows = 
windowAssigner.assignWindows(
         				element.getValue(), element.getTimestamp(), 
windowAssignerContext);

         		//if element is handled by none of assigned elementWindows
         		boolean isSkippedElement = true;

         		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

         		if (windowAssigner instanceof MergingWindowAssigner) {
             . . .
         		} else {
         			for (W window : elementWindows) {

         				// check if the window is already inactive
         				if (isWindowLate(window)) {
         					continue;
         				}
         				isSkippedElement = false;

         				evictingWindowState.setCurrentNamespace(window);
         				evictingWindowState.add(element);

         				triggerContext.key = key;
         				triggerContext.window = window;
         				evictorContext.key = key;
         				evictorContext.window = window;

         				TriggerResult triggerResult = 
triggerContext.onElement(element);

         =>
         		public TriggerResult onElement(StreamRecord<IN> element) 
throws Exception {
         			return trigger.onElement(element.getValue(), 
element.getTimestamp(), window, this);

         =>

         	@Override
         	public TriggerResult onElement(Object element, long timestamp, 
W window, TriggerContext ctx) throws Exception {
         		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
         		count.add(1L);

         =>

             org.apache.flink.runtime.state.heap.HeapReducingState#add
               	@Override
               	public void add(V value) throws IOException {

               		if (value == null) {



On 6/19/20 8:22 PM, Vijay Bhaskar wrote:
> Glad to know some progress. Where are you updating your state here? I 
> couldn't find any flink managed state here.
> I suggested updating the flink managed state using onTimer over an 
> interval equal to the checkpoint interval.
> 
> In your case since you do throttling, it helped to maintain the fixed 
> rate per slot. Before the rate was sporadic.
> It's definitely an IO bottleneck.
> 
> So now you can think of decoupling stateless scanning and stateful joins.
> For example you can keep a stateless scan as separate flink job and keep 
> its output in some Kafka kind of store.
> 
>  From there you start your stateful joins. This would help focussing on 
> your stateful job in much better fashion
> 
> Regards
> Bhaskar
> 
> 
> 
> 
> On Sat, Jun 20, 2020 at 4:49 AM Jeff Henrikson <jehenrik27@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Bhaskar,
> 
>     Based on your idea of limiting input to get better checkpoint behavior,
>     I made a ProcessFunction that constraints to a number of events per
>     second per slot per input.  I do need to do some stateless input
>     scanning before joins.  The stateless part needs to be fast and does no
>     impact snapshots.  So I inserted the throttling after the input
>     preprocessing but before the stateful transformations.  There is a
>     significant difference of snapshot throughput (often 5x or larger) when
>     I change the throttle between 200 and 300 events per second (per slot
>     per input).
> 
>     Hope the throttling keeps being effective as I keep the job running
>     longer.
> 
>     Odd.  But likely a very effective way out of my problem.
> 
>     I wonder what drives it . . .  Thread contention?  IOPS contention?
> 
>     See ProcessFunction code below.
> 
>     Many thanks!
> 
> 
>     Jeff
> 
> 
> 
>     import org.apache.flink.streaming.api.functions.ProcessFunction
>     import org.apache.flink.util.Collector
> 
>     // Set eventsPerSecMax to -1 to disable the throttle
>     // TODO: Actual number of events can be slightly larger
>     // TODO: Remove pause correlation with system clock
> 
>     case class Throttler[T](eventsPerSecMax : Double) extends
>     ProcessFunction[T,T] {
>         var minutePrev = 0
>         var numEvents = 0
>         def minutes() = {
>           val ms = System.currentTimeMillis()
>           (ms / 1000 / 60).toInt
>         }
>         def increment() = {
>           val m = minutes()
>           if(m != minutePrev) {
>             numEvents = 0
>           }
>           numEvents += 1
>         }
>         def eps() = {
>           numEvents/60.0
>         }
>         override def processElement(x: T, ctx: ProcessFunction[T,
>     T]#Context,
>     out: Collector[T]): Unit = {
>           increment()
>           if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
>             Thread.sleep(1000L)
>           }
>           out.collect(x)
>         }
>     }
> 
>     On 6/19/20 9:16 AM, Jeff Henrikson wrote:
>      > Bhaskar,
>      >
>      > Thank you for your thoughtful points.
>      >
>      >  > I want to discuss more on points (1) and (2)
>      >  > If we take care of them  rest will be good
>      >  >
>      >  > Coming to (1)
>      >  >
>      >  > Please try to give reasonable checkpoint interval time for
>     every job.
>      >  > Minum checkpoint interval recommended by flink community is 3
>     minutes
>      >  > I thin you should give minimum 3 minutes checkpoint interval
>     for all
>      >
>      > I have spent very little time testing with checkpoint intervals
>     of under
>      > 3 minutes.  I frequently test with intervals of 5 minutes and of 30
>      > minutes.  I also test with checkpoint intervals such as 60
>     minutes, and
>      > never (manual only).  In terms of which exceptions get thrown, I
>     don't
>      > see much difference between 5/30/60, I don't see a lot of difference.
>      >
>      > Infinity (no checkpoint internal) seems to be an interesting value,
>      > because before crashing, it seems to process around twice as much
>     state
>      > as with any finite checkpoint interval.  The largest savepoints I
>     have
>      > captured have been manually triggered using the /job/:jobid/stop
>     REST
>      > API.  I think it helps for the snapshot to be synchronous.
>      >
>      > One curiosity about the /job/:jobid/stop command is that from
>     time of
>      > the command, it often takes many minutes for the internal
>     processing to
>      > stop.
>      >
>      > Another curiosity about /job/:jobid/stop command is that sometimes
>      > following a completed savepoint, the cluster goes back to running!
>      >
>      >  > Coming to (2)
>      >  >
>      >  > What's your input data rate?
>      >
>      > My application involves what I will call "main" events that are
>     enriched
>      > by "secondary" events.  While the secondary events have several
>      > different input streams, data types, and join keys, I will
>     estimate the
>      > secondary events all together.  My estimate for input rate is as
>     follows:
>      >
>      >      50M "main" events
>      >      50 secondary events for each main event, for a
>      >          total of around 2.5B input events
>      >      8 nodes
>      >      20 hours
>      >
>      > Combining these figures, we can estimate:
>      >
>      >      50000000*50/8/20/3600 = 4340 events/second/node
>      >
>      > I don't see how to act on your advice for (2).  Maybe your idea
>     is that
>      > during backfill/bootstrap, I artificially throttle the inputs to my
>      > application?
>      >
>      > 100% of my application state is due to .cogroup, which manages a
>      > HeapListState on its own.  I cannot think of any controls for
>     changing
>      > how .cogroup handles internal state per se.  I will paste below the
>      > Flink code path that .cogroup uses to update its internal state
>     when it
>      > runs my application.
>      >
>      > The only control I can think of with .cogroup that indirectly
>     impacts
>      > internal state is delayed triggering.
>      >
>      > Currently I use a trigger on every event, which I understand
>     creates a
>      > suboptimal number of events.  I previously experimented with delayed
>      > triggering, but I did not get good results.
>      >
>      > Just now I tried again ContinuousProcessingTimeTrigger of 30
>     seconds,
>      > with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint
>      > interval.  The first checkpoint failed, which has been rare when
>     I use
>      > all the same parameters except for triggering on every event.  So it
>      > looks worse not better.
>      >
>      > Thanks again,
>      >
>      >
>      > Jeff Henrikson
>      >
>      >
>      >
>      >
>      > On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
>      >> Thanks for the reply.
>      >> I want to discuss more on points (1) and (2)
>      >> If we take care of them  rest will be good
>      >>
>      >> Coming to (1)
>      >>
>      >> Please try to give reasonable checkpoint interval time for every
>     job.
>      >> Minum checkpoint interval recommended by flink community is 3
>     minutes
>      >> I thin you should give minimum 3 minutes checkpoint interval for all
>      >>
>      >> Coming to (2)
>      >>
>      >> What's your input data rate?
>      >> For example you are seeing data at 100 msg/sec, For each message if
>      >> there is state changing and you are updating the state with
>     RocksDB,
>      >> it's going to
>      >> create 100 rows in 1 second at RocksDb end, On the average if 50
>      >> records have changed each second, even if you are using RocksDB
>      >> differentialstate = true,
>      >> there is no use. Because everytime 50% is new rows getting
>     added. So
>      >> the best bet is to update records with RocksDB only once in your
>      >> checkpoint interval.
>      >> Suppose your checkpoint interval is 5 minutes. If you update
>     RocksDB
>      >> state once in 5 minutes, then the rate at which new records
>     added to
>      >> RocksDB  will be 1 record/5min.
>      >> Whereas in your original scenario, 30000 records added to
>     rocksDB in 5
>      >> min. You can save 1:30000 ratio of records in addition to RocksDB.
>      >> Which will save a huge
>      >> redundant size addition to RocksDB. Ultimately your  state is
>     driven
>      >> by your checkpoint interval. From the input source you will go
>     back 5
>      >> min back and read the state, similarly from RocksDB side
>      >> also you can have a state update once in 5 min should work.
>     Otherwise
>      >> even if you add state there is no use.
>      >>
>      >> Regards
>      >> Bhaskar
>      >>
>      >> Try to update your RocksDB state in an interval equal to the
>      >> checkpoint interval. Otherwise in my case many times what's
>     observed is
>      >> state size grows unnecessarily.
>      >>
>      >> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson
>     <jehenrik27@gmail.com <ma...@gmail.com>
>      >> <mailto:jehenrik27@gmail.com <ma...@gmail.com>>> wrote:
>      >>
>      >>     Vijay,
>      >>
>      >>     Thanks for your thoughts.  Below are answers to your questions.
>      >>
>      >>       > 1. What's your checkpoint interval?
>      >>
>      >>     I have used many different checkpoint intervals, ranging from 5
>      >> minutes
>      >>     to never.  I usually setMinPasueBetweenCheckpoints to the same
>      >> value as
>      >>     the checkpoint interval.
>      >>
>      >>       > 2. How frequently are you updating the state into RocksDB?
>      >>
>      >>     My understanding is that for .cogroup:
>      >>
>      >>         - Triggers control communication outside the operator
>      >>         - Evictors control cleanup of internal state
>      >>         - Configurations like write buffer size control the
>     frequency of
>      >>     state change at the storage layer
>      >>         - There is no control for how frequently the window state
>      >>     updates at
>      >>     the layer of the RocksDB api layer.
>      >>
>      >>     Thus, the state update whenever data is ingested.
>      >>
>      >>       > 3. How many task managers are you using?
>      >>
>      >>     Usually I have been running with one slot per taskmanager. 
>     28GB of
>      >>     usable ram on each node.
>      >>
>      >>       > 4. How much data each task manager handles while taking the
>      >>     checkpoint?
>      >>
>      >>     Funny you should ask.  I would be okay with zero.
>      >>
>      >>     The application I am replacing has a latency of 36-48 hours,
>     so if I
>      >>     had
>      >>     to fully stop processing to take every snapshot
>     synchronously, it
>      >> might
>      >>     be seen as totally acceptable, especially for initial
>     bootstrap.
>      >> Also,
>      >>     the velocity of running this backfill is approximately 115x real
>      >>     time on
>      >>     8 nodes, so the steady-state run may not exhibit the failure
>     mode in
>      >>     question at all.
>      >>
>      >>     It has come as some frustration to me that, in the case of
>      >>     RocksDBStateBackend, the configuration key state.backend.async
>      >>     effectively has no meaningful way to be false.
>      >>
>      >>     The only way I have found in the existing code to get a
>     behavior like
>      >>     synchronous snapshot is to POST to /jobs/<jobID>/stop with
>      >> drain=false
>      >>     and a URL.  This method of failing fast is the way that I
>     discovered
>      >>     that I needed to increase transfer threads from the default.
>      >>
>      >>     The reason I don't just run the whole backfill and then take one
>      >>     snapshot is that even in the absence of checkpoints, a very
>     similar
>      >>     congestion seems to take the cluster down when I am say
>     20-30% of the
>      >>     way through my backfill.
>      >>
>      >>     Reloading from my largest feasible snapshot makes it
>     possible to make
>      >>     another snapshot a bit larger before crash, but not by much.
>      >>
>      >>     On first glance, the code change to allow
>     RocksDBStateBackend into a
>      >>     synchronous snapshots mode looks pretty easy.  Nevertheless,
>     I was
>      >>     hoping to do the initial launch of my application without
>     needing to
>      >>     modify the framework.
>      >>
>      >>     Regards,
>      >>
>      >>
>      >>     Jeff Henrikson
>      >>
>      >>
>      >>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
>      >>      > For me this seems to be an IO bottleneck at your task
>     manager.
>      >>      > I have a couple of queries:
>      >>      > 1. What's your checkpoint interval?
>      >>      > 2. How frequently are you updating the state into RocksDB?
>      >>      > 3. How many task managers are you using?
>      >>      > 4. How much data each task manager handles while taking the
>      >>     checkpoint?
>      >>      >
>      >>      > For points (3) and (4) , you should be very careful. I
>     feel you
>      >> are
>      >>      > stuck at this.
>      >>      > You try to scale vertically by increasing more CPU and
>     memory for
>      >>     each
>      >>      > task manager.
>      >>      > If not, try to scale horizontally so that each task
>     manager IO
>      >>     gets reduces
>      >>      > Apart from that check is there any bottleneck with the file
>      >> system.
>      >>      >
>      >>      > Regards
>      >>      > Bhaskar
>      >>      >
>      >>      >
>      >>      >
>      >>      >
>      >>      >
>      >>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor
>     <victtim@gmail.com <ma...@gmail.com>
>      >>     <mailto:victtim@gmail.com <ma...@gmail.com>>
>      >>      > <mailto:victtim@gmail.com <ma...@gmail.com>
>     <mailto:victtim@gmail.com <ma...@gmail.com>>>> wrote:
>      >>      >
>      >>      >     I had a similar problem.   I ended up solving by not
>      >> relying on
>      >>      >     checkpoints for recovery and instead re-read my input
>     sources
>      >>     (in my
>      >>      >     case a kafka topic) from the earliest offset and
>     rebuilding
>      >>     only the
>      >>      >     state I need.  I only need to care about the past 1 to 2
>      >> days of
>      >>      >     state so can afford to drop anything older.   My recovery
>      >>     time went
>      >>      >     from over an hour for just the first checkpoint to
>     under 10
>      >>     minutes.
>      >>      >
>      >>      >     Tim
>      >>      >
>      >>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang
>     <myasuka@live.com <ma...@live.com>
>      >>     <mailto:myasuka@live.com <ma...@live.com>>
>      >>      >     <mailto:myasuka@live.com <ma...@live.com>
>     <mailto:myasuka@live.com <ma...@live.com>>>> wrote:
>      >>      >
>      >>      >         Hi Jeff
>      >>      >
>      >>      >          1. "after around 50GB of state, I stop being able to
>      >>     reliably
>      >>      >             take checkpoints or savepoints. "
>      >>      >             What is the exact reason that job cannot complete
>      >>      >             checkpoint? Expired before completing or
>     decline by
>      >> some
>      >>      >             tasks? The former one is manly caused by high
>      >>     back-pressure
>      >>      >             and the later one is mainly due to some internal
>      >> error.
>      >>      >          2. Have you checked what reason the remote task
>     manager
>      >>     is lost?
>      >>      >             If the remote task manager is not crashed, it
>     might
>      >>     be due
>      >>      >             to GC impact, I think you might need to check
>      >>     task-manager
>      >>      >             logs and GC logs.
>      >>      >
>      >>      >         Best
>      >>      >         Yun Tang
>      >>      >
>      >>
>       ------------------------------------------------------------------------
>      >>      >         *From:* Jeff Henrikson <jehenrik27@gmail.com
>     <ma...@gmail.com>
>      >>     <mailto:jehenrik27@gmail.com <ma...@gmail.com>>
>      >>      >         <mailto:jehenrik27@gmail.com
>     <ma...@gmail.com>
>      >> <mailto:jehenrik27@gmail.com <ma...@gmail.com>>>>
>      >>      >         *Sent:* Thursday, June 18, 2020 1:46
>      >>      >         *To:* user <user@flink.apache.org
>     <ma...@flink.apache.org>
>      >>     <mailto:user@flink.apache.org
>     <ma...@flink.apache.org>> <mailto:user@flink.apache.org
>     <ma...@flink.apache.org>
>      >>     <mailto:user@flink.apache.org <ma...@flink.apache.org>>>>
>      >>      >         *Subject:* Trouble with large state
>      >>      >         Hello Flink users,
>      >>      >
>      >>      >         I have an application of around 10 enrichment
>     joins.  All
>      >>     events
>      >>      >         are
>      >>      >         read from kafka and have event timestamps.  The
>     joins are
>      >>     built
>      >>      >         using
>      >>      >         .cogroup, with a global window, triggering on every 1
>      >>     event, plus a
>      >>      >         custom evictor that drops records once a newer
>     record
>      >> for the
>      >>      >         same ID
>      >>      >         has been processed.  Deletes are represented by empty
>      >>     events with
>      >>      >         timestamp and ID (tombstones). That way, we can drop
>      >>     records when
>      >>      >         business logic dictates, as opposed to when a maximum
>      >>     retention
>      >>      >         has been
>      >>      >         attained.  The application runs
>     RocksDBStateBackend, on
>      >>      >         Kubernetes on
>      >>      >         AWS with local SSDs.
>      >>      >
>      >>      >         Unit tests show that the joins produce expected
>      >> results.     On an
>      >>      >         8 node
>      >>      >         cluster, watermark output progress seems to
>     indicate I
>      >>     should be
>      >>      >         able to
>      >>      >         bootstrap my state of around 500GB in around 1
>     day.  I am
>      >>     able
>      >>      >         to save
>      >>      >         and restore savepoints for the first half an hour
>     of run
>      >>     time.
>      >>      >
>      >>      >         My current trouble is that after around 50GB of
>     state,
>      >> I stop
>      >>      >         being able
>      >>      >         to reliably take checkpoints or savepoints.  Some
>     time
>      >> after
>      >>      >         that, I
>      >>      >         start getting a variety of failures where the first
>      >>     suspicious
>      >>      >         log event
>      >>      >         is a generic cluster connectivity error, such as:
>      >>      >
>      >>      >               1) java.io.IOException: Connecting the channel
>      >> failed:
>      >>      >         Connecting
>      >>      >               to remote task manager +
>     '/10.67.7.101:38955 <http://10.67.7.101:38955>
>      >>     <http://10.67.7.101:38955>
>      >>      >         <http://10.67.7.101:38955>' has failed. This
>      >>      >               might indicate that the remote task manager has
>      >>     been lost.
>      >>      >
>      >>      >               2) org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>
>      >>     <http://org.apache.flink.runtime.io>.network.netty.exception
>      >>      >               .RemoteTransportException: Connection
>     unexpectedly
>      >>     closed
>      >>      >         by remote
>      >>      >               task manager 'null'. This might indicate
>     that the
>      >>     remote task
>      >>      >               manager was lost.
>      >>      >
>      >>      >               3) Association with remote system
>      >>      >               [akka.tcp://flink@10.67.6.66:34987
>     <http://flink@10.67.6.66:34987>
>      >>     <http://flink@10.67.6.66:34987>
>      >>      >         <http://flink@10.67.6.66:34987>] has failed,
>     address is
>      >> now
>      >>      >               gated for [50] ms. Reason: [Association
>     failed with
>      >>      >               [akka.tcp://flink@10.67.6.66:34987
>     <http://flink@10.67.6.66:34987>
>      >>     <http://flink@10.67.6.66:34987>
>      >>      >         <http://flink@10.67.6.66:34987>]] Caused by:
>      >>      >               [java.net <http://java.net>
>     <http://java.net>.NoRouteToHostException:
>      >>     No route to host]
>      >>      >
>      >>      >         I don't see any obvious out of memory errors on the
>      >>     TaskManager UI.
>      >>      >
>      >>      >         Adding nodes to the cluster does not seem to
>     increase the
>      >>     maximum
>      >>      >         savable state size.
>      >>      >
>      >>      >         I could enable HA, but for the time being I have been
>      >>     leaving it
>      >>      >         out to
>      >>      >         avoid the possibility of masking deterministic
>     faults.
>      >>      >
>      >>      >         Below are my configurations.
>      >>      >
>      >>      >         Thanks in advance for any advice.
>      >>      >
>      >>      >         Regards,
>      >>      >
>      >>      >
>      >>      >         Jeff Henrikson
>      >>      >
>      >>      >
>      >>      >
>      >>      >         Flink version: 1.10
>      >>      >
>      >>      >         Configuration set via code:
>      >>      >               parallelism=8
>      >>      >               maxParallelism=64
>      >>      >     setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>      >>      >
>      >> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>      >>      >               setTolerableCheckpointFailureNumber(1000)
>      >>      >               setMaxConcurrentCheckpoints(1)
>      >>      >
>      >>      >
>      >>
>       enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>      >>
>      >>      >               RocksDBStateBackend
>      >>      >    
>     setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>      >>      >               setNumberOfTransferThreads(25)
>      >>      >               setDbStoragePath points to a local nvme SSD
>      >>      >
>      >>      >         Configuration in flink-conf.yaml:
>      >>      >
>      >>      >               jobmanager.rpc.address: localhost
>      >>      >               jobmanager.rpc.port: 6123
>      >>      >               jobmanager.heap.size: 28000m
>      >>      >               taskmanager.memory.process.size: 28000m
>      >>      >               taskmanager.memory.jvm-metaspace.size: 512m
>      >>      >               taskmanager.numberOfTaskSlots: 1
>      >>      >               parallelism.default: 1
>      >>      >               jobmanager.execution.failover-strategy: full
>      >>      >
>      >>      >               cluster.evenly-spread-out-slots: false
>      >>      >
>      >>      >               taskmanager.memory.network.fraction:
>     0.2           #
>      >>      >         default 0.1
>      >>      >               taskmanager.memory.framework.off-heap.size: 2GB
>      >>      >               taskmanager.memory.task.off-heap.size: 2GB
>      >>      >              
>     taskmanager.network.memory.buffers-per-channel: 32
>      >>     # default 2
>      >>      >               taskmanager.memory.managed.fraction: 0.4     #
>      >> docs say
>      >>      >         default 0.1, but something seems to set 0.4
>      >>      >               taskmanager.memory.task.off-heap.size:
>     2048MB      #
>      >>      >         default 128M
>      >>      >
>      >>      >               state.backend.fs.memory-threshold: 1048576
>      >>      >               state.backend.fs.write-buffer-size: 10240000
>      >>      >               state.backend.local-recovery: true
>      >>      >               state.backend.rocksdb.writebuffer.size: 64MB
>      >>      >               state.backend.rocksdb.writebuffer.count: 8
>      >>      >              
>     state.backend.rocksdb.writebuffer.number-to-merge: 4
>      >>      >              
>     state.backend.rocksdb.timer-service.factory: heap
>      >>      >               state.backend.rocksdb.block.cache-size:
>     64000000 #
>      >>     default 8MB
>      >>      >               state.backend.rocksdb.write-batch-size:
>     16000000 #
>      >>     default 2MB
>      >>      >
>      >>      >               web.checkpoints.history: 250
>      >>      >
>      >>
> 


Re: Trouble with large state

Posted by Vijay Bhaskar <bh...@gmail.com>.
Glad to know some progress. Where are you updating your state here? I
couldn't find any flink managed state here.
I suggested updating the flink managed state using onTimer over an interval
equal to the checkpoint interval.

In your case since you do throttling, it helped to maintain the fixed rate
per slot. Before the rate was sporadic.
It's definitely an IO bottleneck.

So now you can think of decoupling stateless scanning and stateful joins.
For example you can keep a stateless scan as separate flink job and keep
its output in some Kafka kind of store.

From there you start your stateful joins. This would help focussing on your
stateful job in much better fashion

Regards
Bhaskar




On Sat, Jun 20, 2020 at 4:49 AM Jeff Henrikson <je...@gmail.com> wrote:

> Bhaskar,
>
> Based on your idea of limiting input to get better checkpoint behavior,
> I made a ProcessFunction that constraints to a number of events per
> second per slot per input.  I do need to do some stateless input
> scanning before joins.  The stateless part needs to be fast and does no
> impact snapshots.  So I inserted the throttling after the input
> preprocessing but before the stateful transformations.  There is a
> significant difference of snapshot throughput (often 5x or larger) when
> I change the throttle between 200 and 300 events per second (per slot
> per input).
>
> Hope the throttling keeps being effective as I keep the job running longer.
>
> Odd.  But likely a very effective way out of my problem.
>
> I wonder what drives it . . .  Thread contention?  IOPS contention?
>
> See ProcessFunction code below.
>
> Many thanks!
>
>
> Jeff
>
>
>
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.util.Collector
>
> // Set eventsPerSecMax to -1 to disable the throttle
> // TODO: Actual number of events can be slightly larger
> // TODO: Remove pause correlation with system clock
>
> case class Throttler[T](eventsPerSecMax : Double) extends
> ProcessFunction[T,T] {
>    var minutePrev = 0
>    var numEvents = 0
>    def minutes() = {
>      val ms = System.currentTimeMillis()
>      (ms / 1000 / 60).toInt
>    }
>    def increment() = {
>      val m = minutes()
>      if(m != minutePrev) {
>        numEvents = 0
>      }
>      numEvents += 1
>    }
>    def eps() = {
>      numEvents/60.0
>    }
>    override def processElement(x: T, ctx: ProcessFunction[T, T]#Context,
> out: Collector[T]): Unit = {
>      increment()
>      if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
>        Thread.sleep(1000L)
>      }
>      out.collect(x)
>    }
> }
>
> On 6/19/20 9:16 AM, Jeff Henrikson wrote:
> > Bhaskar,
> >
> > Thank you for your thoughtful points.
> >
> >  > I want to discuss more on points (1) and (2)
> >  > If we take care of them  rest will be good
> >  >
> >  > Coming to (1)
> >  >
> >  > Please try to give reasonable checkpoint interval time for every job.
> >  > Minum checkpoint interval recommended by flink community is 3 minutes
> >  > I thin you should give minimum 3 minutes checkpoint interval for all
> >
> > I have spent very little time testing with checkpoint intervals of under
> > 3 minutes.  I frequently test with intervals of 5 minutes and of 30
> > minutes.  I also test with checkpoint intervals such as 60 minutes, and
> > never (manual only).  In terms of which exceptions get thrown, I don't
> > see much difference between 5/30/60, I don't see a lot of difference.
> >
> > Infinity (no checkpoint internal) seems to be an interesting value,
> > because before crashing, it seems to process around twice as much state
> > as with any finite checkpoint interval.  The largest savepoints I have
> > captured have been manually triggered using the /job/:jobid/stop REST
> > API.  I think it helps for the snapshot to be synchronous.
> >
> > One curiosity about the /job/:jobid/stop command is that from time of
> > the command, it often takes many minutes for the internal processing to
> > stop.
> >
> > Another curiosity about /job/:jobid/stop command is that sometimes
> > following a completed savepoint, the cluster goes back to running!
> >
> >  > Coming to (2)
> >  >
> >  > What's your input data rate?
> >
> > My application involves what I will call "main" events that are enriched
> > by "secondary" events.  While the secondary events have several
> > different input streams, data types, and join keys, I will estimate the
> > secondary events all together.  My estimate for input rate is as follows:
> >
> >      50M "main" events
> >      50 secondary events for each main event, for a
> >          total of around 2.5B input events
> >      8 nodes
> >      20 hours
> >
> > Combining these figures, we can estimate:
> >
> >      50000000*50/8/20/3600 = 4340 events/second/node
> >
> > I don't see how to act on your advice for (2).  Maybe your idea is that
> > during backfill/bootstrap, I artificially throttle the inputs to my
> > application?
> >
> > 100% of my application state is due to .cogroup, which manages a
> > HeapListState on its own.  I cannot think of any controls for changing
> > how .cogroup handles internal state per se.  I will paste below the
> > Flink code path that .cogroup uses to update its internal state when it
> > runs my application.
> >
> > The only control I can think of with .cogroup that indirectly impacts
> > internal state is delayed triggering.
> >
> > Currently I use a trigger on every event, which I understand creates a
> > suboptimal number of events.  I previously experimented with delayed
> > triggering, but I did not get good results.
> >
> > Just now I tried again ContinuousProcessingTimeTrigger of 30 seconds,
> > with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint
> > interval.  The first checkpoint failed, which has been rare when I use
> > all the same parameters except for triggering on every event.  So it
> > looks worse not better.
> >
> > Thanks again,
> >
> >
> > Jeff Henrikson
> >
> >
> >
> >
> > On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
> >> Thanks for the reply.
> >> I want to discuss more on points (1) and (2)
> >> If we take care of them  rest will be good
> >>
> >> Coming to (1)
> >>
> >> Please try to give reasonable checkpoint interval time for every job.
> >> Minum checkpoint interval recommended by flink community is 3 minutes
> >> I thin you should give minimum 3 minutes checkpoint interval for all
> >>
> >> Coming to (2)
> >>
> >> What's your input data rate?
> >> For example you are seeing data at 100 msg/sec, For each message if
> >> there is state changing and you are updating the state with RocksDB,
> >> it's going to
> >> create 100 rows in 1 second at RocksDb end, On the average if 50
> >> records have changed each second, even if you are using RocksDB
> >> differentialstate = true,
> >> there is no use. Because everytime 50% is new rows getting added. So
> >> the best bet is to update records with RocksDB only once in your
> >> checkpoint interval.
> >> Suppose your checkpoint interval is 5 minutes. If you update RocksDB
> >> state once in 5 minutes, then the rate at which new records added to
> >> RocksDB  will be 1 record/5min.
> >> Whereas in your original scenario, 30000 records added to rocksDB in 5
> >> min. You can save 1:30000 ratio of records in addition to RocksDB.
> >> Which will save a huge
> >> redundant size addition to RocksDB. Ultimately your  state is driven
> >> by your checkpoint interval. From the input source you will go back 5
> >> min back and read the state, similarly from RocksDB side
> >> also you can have a state update once in 5 min should work. Otherwise
> >> even if you add state there is no use.
> >>
> >> Regards
> >> Bhaskar
> >>
> >> Try to update your RocksDB state in an interval equal to the
> >> checkpoint interval. Otherwise in my case many times what's observed is
> >> state size grows unnecessarily.
> >>
> >> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson <jehenrik27@gmail.com
> >> <ma...@gmail.com>> wrote:
> >>
> >>     Vijay,
> >>
> >>     Thanks for your thoughts.  Below are answers to your questions.
> >>
> >>       > 1. What's your checkpoint interval?
> >>
> >>     I have used many different checkpoint intervals, ranging from 5
> >> minutes
> >>     to never.  I usually setMinPasueBetweenCheckpoints to the same
> >> value as
> >>     the checkpoint interval.
> >>
> >>       > 2. How frequently are you updating the state into RocksDB?
> >>
> >>     My understanding is that for .cogroup:
> >>
> >>         - Triggers control communication outside the operator
> >>         - Evictors control cleanup of internal state
> >>         - Configurations like write buffer size control the frequency of
> >>     state change at the storage layer
> >>         - There is no control for how frequently the window state
> >>     updates at
> >>     the layer of the RocksDB api layer.
> >>
> >>     Thus, the state update whenever data is ingested.
> >>
> >>       > 3. How many task managers are you using?
> >>
> >>     Usually I have been running with one slot per taskmanager.  28GB of
> >>     usable ram on each node.
> >>
> >>       > 4. How much data each task manager handles while taking the
> >>     checkpoint?
> >>
> >>     Funny you should ask.  I would be okay with zero.
> >>
> >>     The application I am replacing has a latency of 36-48 hours, so if I
> >>     had
> >>     to fully stop processing to take every snapshot synchronously, it
> >> might
> >>     be seen as totally acceptable, especially for initial bootstrap.
> >> Also,
> >>     the velocity of running this backfill is approximately 115x real
> >>     time on
> >>     8 nodes, so the steady-state run may not exhibit the failure mode in
> >>     question at all.
> >>
> >>     It has come as some frustration to me that, in the case of
> >>     RocksDBStateBackend, the configuration key state.backend.async
> >>     effectively has no meaningful way to be false.
> >>
> >>     The only way I have found in the existing code to get a behavior
> like
> >>     synchronous snapshot is to POST to /jobs/<jobID>/stop with
> >> drain=false
> >>     and a URL.  This method of failing fast is the way that I discovered
> >>     that I needed to increase transfer threads from the default.
> >>
> >>     The reason I don't just run the whole backfill and then take one
> >>     snapshot is that even in the absence of checkpoints, a very similar
> >>     congestion seems to take the cluster down when I am say 20-30% of
> the
> >>     way through my backfill.
> >>
> >>     Reloading from my largest feasible snapshot makes it possible to
> make
> >>     another snapshot a bit larger before crash, but not by much.
> >>
> >>     On first glance, the code change to allow RocksDBStateBackend into a
> >>     synchronous snapshots mode looks pretty easy.  Nevertheless, I was
> >>     hoping to do the initial launch of my application without needing to
> >>     modify the framework.
> >>
> >>     Regards,
> >>
> >>
> >>     Jeff Henrikson
> >>
> >>
> >>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
> >>      > For me this seems to be an IO bottleneck at your task manager.
> >>      > I have a couple of queries:
> >>      > 1. What's your checkpoint interval?
> >>      > 2. How frequently are you updating the state into RocksDB?
> >>      > 3. How many task managers are you using?
> >>      > 4. How much data each task manager handles while taking the
> >>     checkpoint?
> >>      >
> >>      > For points (3) and (4) , you should be very careful. I feel you
> >> are
> >>      > stuck at this.
> >>      > You try to scale vertically by increasing more CPU and memory for
> >>     each
> >>      > task manager.
> >>      > If not, try to scale horizontally so that each task manager IO
> >>     gets reduces
> >>      > Apart from that check is there any bottleneck with the file
> >> system.
> >>      >
> >>      > Regards
> >>      > Bhaskar
> >>      >
> >>      >
> >>      >
> >>      >
> >>      >
> >>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <
> victtim@gmail.com
> >>     <ma...@gmail.com>
> >>      > <mailto:victtim@gmail.com <ma...@gmail.com>>> wrote:
> >>      >
> >>      >     I had a similar problem.   I ended up solving by not
> >> relying on
> >>      >     checkpoints for recovery and instead re-read my input sources
> >>     (in my
> >>      >     case a kafka topic) from the earliest offset and rebuilding
> >>     only the
> >>      >     state I need.  I only need to care about the past 1 to 2
> >> days of
> >>      >     state so can afford to drop anything older.   My recovery
> >>     time went
> >>      >     from over an hour for just the first checkpoint to under 10
> >>     minutes.
> >>      >
> >>      >     Tim
> >>      >
> >>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang <myasuka@live.com
> >>     <ma...@live.com>
> >>      >     <mailto:myasuka@live.com <ma...@live.com>>> wrote:
> >>      >
> >>      >         Hi Jeff
> >>      >
> >>      >          1. "after around 50GB of state, I stop being able to
> >>     reliably
> >>      >             take checkpoints or savepoints. "
> >>      >             What is the exact reason that job cannot complete
> >>      >             checkpoint? Expired before completing or decline by
> >> some
> >>      >             tasks? The former one is manly caused by high
> >>     back-pressure
> >>      >             and the later one is mainly due to some internal
> >> error.
> >>      >          2. Have you checked what reason the remote task manager
> >>     is lost?
> >>      >             If the remote task manager is not crashed, it might
> >>     be due
> >>      >             to GC impact, I think you might need to check
> >>     task-manager
> >>      >             logs and GC logs.
> >>      >
> >>      >         Best
> >>      >         Yun Tang
> >>      >
> >>
>  ------------------------------------------------------------------------
> >>      >         *From:* Jeff Henrikson <jehenrik27@gmail.com
> >>     <ma...@gmail.com>
> >>      >         <mailto:jehenrik27@gmail.com
> >> <ma...@gmail.com>>>
> >>      >         *Sent:* Thursday, June 18, 2020 1:46
> >>      >         *To:* user <user@flink.apache.org
> >>     <ma...@flink.apache.org> <mailto:user@flink.apache.org
> >>     <ma...@flink.apache.org>>>
> >>      >         *Subject:* Trouble with large state
> >>      >         Hello Flink users,
> >>      >
> >>      >         I have an application of around 10 enrichment joins.  All
> >>     events
> >>      >         are
> >>      >         read from kafka and have event timestamps.  The joins are
> >>     built
> >>      >         using
> >>      >         .cogroup, with a global window, triggering on every 1
> >>     event, plus a
> >>      >         custom evictor that drops records once a newer record
> >> for the
> >>      >         same ID
> >>      >         has been processed.  Deletes are represented by empty
> >>     events with
> >>      >         timestamp and ID (tombstones). That way, we can drop
> >>     records when
> >>      >         business logic dictates, as opposed to when a maximum
> >>     retention
> >>      >         has been
> >>      >         attained.  The application runs RocksDBStateBackend, on
> >>      >         Kubernetes on
> >>      >         AWS with local SSDs.
> >>      >
> >>      >         Unit tests show that the joins produce expected
> >> results.     On an
> >>      >         8 node
> >>      >         cluster, watermark output progress seems to indicate I
> >>     should be
> >>      >         able to
> >>      >         bootstrap my state of around 500GB in around 1 day.  I am
> >>     able
> >>      >         to save
> >>      >         and restore savepoints for the first half an hour of run
> >>     time.
> >>      >
> >>      >         My current trouble is that after around 50GB of state,
> >> I stop
> >>      >         being able
> >>      >         to reliably take checkpoints or savepoints.  Some time
> >> after
> >>      >         that, I
> >>      >         start getting a variety of failures where the first
> >>     suspicious
> >>      >         log event
> >>      >         is a generic cluster connectivity error, such as:
> >>      >
> >>      >               1) java.io.IOException: Connecting the channel
> >> failed:
> >>      >         Connecting
> >>      >               to remote task manager + '/10.67.7.101:38955
> >>     <http://10.67.7.101:38955>
> >>      >         <http://10.67.7.101:38955>' has failed. This
> >>      >               might indicate that the remote task manager has
> >>     been lost.
> >>      >
> >>      >               2) org.apache.flink.runtime.io
> >>     <http://org.apache.flink.runtime.io>.network.netty.exception
> >>      >               .RemoteTransportException: Connection unexpectedly
> >>     closed
> >>      >         by remote
> >>      >               task manager 'null'. This might indicate that the
> >>     remote task
> >>      >               manager was lost.
> >>      >
> >>      >               3) Association with remote system
> >>      >               [akka.tcp://flink@10.67.6.66:34987
> >>     <http://flink@10.67.6.66:34987>
> >>      >         <http://flink@10.67.6.66:34987>] has failed, address is
> >> now
> >>      >               gated for [50] ms. Reason: [Association failed with
> >>      >               [akka.tcp://flink@10.67.6.66:34987
> >>     <http://flink@10.67.6.66:34987>
> >>      >         <http://flink@10.67.6.66:34987>]] Caused by:
> >>      >               [java.net <http://java.net
> >.NoRouteToHostException:
> >>     No route to host]
> >>      >
> >>      >         I don't see any obvious out of memory errors on the
> >>     TaskManager UI.
> >>      >
> >>      >         Adding nodes to the cluster does not seem to increase the
> >>     maximum
> >>      >         savable state size.
> >>      >
> >>      >         I could enable HA, but for the time being I have been
> >>     leaving it
> >>      >         out to
> >>      >         avoid the possibility of masking deterministic faults.
> >>      >
> >>      >         Below are my configurations.
> >>      >
> >>      >         Thanks in advance for any advice.
> >>      >
> >>      >         Regards,
> >>      >
> >>      >
> >>      >         Jeff Henrikson
> >>      >
> >>      >
> >>      >
> >>      >         Flink version: 1.10
> >>      >
> >>      >         Configuration set via code:
> >>      >               parallelism=8
> >>      >               maxParallelism=64
> >>      >     setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >>      >
> >> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> >>      >               setTolerableCheckpointFailureNumber(1000)
> >>      >               setMaxConcurrentCheckpoints(1)
> >>      >
> >>      >
> >>
>  enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>
> >>
> >>      >               RocksDBStateBackend
> >>      >     setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
> >>      >               setNumberOfTransferThreads(25)
> >>      >               setDbStoragePath points to a local nvme SSD
> >>      >
> >>      >         Configuration in flink-conf.yaml:
> >>      >
> >>      >               jobmanager.rpc.address: localhost
> >>      >               jobmanager.rpc.port: 6123
> >>      >               jobmanager.heap.size: 28000m
> >>      >               taskmanager.memory.process.size: 28000m
> >>      >               taskmanager.memory.jvm-metaspace.size: 512m
> >>      >               taskmanager.numberOfTaskSlots: 1
> >>      >               parallelism.default: 1
> >>      >               jobmanager.execution.failover-strategy: full
> >>      >
> >>      >               cluster.evenly-spread-out-slots: false
> >>      >
> >>      >               taskmanager.memory.network.fraction: 0.2
> #
> >>      >         default 0.1
> >>      >               taskmanager.memory.framework.off-heap.size: 2GB
> >>      >               taskmanager.memory.task.off-heap.size: 2GB
> >>      >               taskmanager.network.memory.buffers-per-channel: 32
> >>     # default 2
> >>      >               taskmanager.memory.managed.fraction: 0.4     #
> >> docs say
> >>      >         default 0.1, but something seems to set 0.4
> >>      >               taskmanager.memory.task.off-heap.size: 2048MB
> #
> >>      >         default 128M
> >>      >
> >>      >               state.backend.fs.memory-threshold: 1048576
> >>      >               state.backend.fs.write-buffer-size: 10240000
> >>      >               state.backend.local-recovery: true
> >>      >               state.backend.rocksdb.writebuffer.size: 64MB
> >>      >               state.backend.rocksdb.writebuffer.count: 8
> >>      >               state.backend.rocksdb.writebuffer.number-to-merge:
> 4
> >>      >               state.backend.rocksdb.timer-service.factory: heap
> >>      >               state.backend.rocksdb.block.cache-size: 64000000 #
> >>     default 8MB
> >>      >               state.backend.rocksdb.write-batch-size: 16000000 #
> >>     default 2MB
> >>      >
> >>      >               web.checkpoints.history: 250
> >>      >
> >>
>

Re: Trouble with large state

Posted by Jeff Henrikson <je...@gmail.com>.
Bhaskar,

Based on your idea of limiting input to get better checkpoint behavior, 
I made a ProcessFunction that constraints to a number of events per 
second per slot per input.  I do need to do some stateless input 
scanning before joins.  The stateless part needs to be fast and does no 
impact snapshots.  So I inserted the throttling after the input 
preprocessing but before the stateful transformations.  There is a 
significant difference of snapshot throughput (often 5x or larger) when 
I change the throttle between 200 and 300 events per second (per slot 
per input).

Hope the throttling keeps being effective as I keep the job running longer.

Odd.  But likely a very effective way out of my problem.

I wonder what drives it . . .  Thread contention?  IOPS contention?

See ProcessFunction code below.

Many thanks!


Jeff



import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

// Set eventsPerSecMax to -1 to disable the throttle
// TODO: Actual number of events can be slightly larger
// TODO: Remove pause correlation with system clock

case class Throttler[T](eventsPerSecMax : Double) extends 
ProcessFunction[T,T] {
   var minutePrev = 0
   var numEvents = 0
   def minutes() = {
     val ms = System.currentTimeMillis()
     (ms / 1000 / 60).toInt
   }
   def increment() = {
     val m = minutes()
     if(m != minutePrev) {
       numEvents = 0
     }
     numEvents += 1
   }
   def eps() = {
     numEvents/60.0
   }
   override def processElement(x: T, ctx: ProcessFunction[T, T]#Context, 
out: Collector[T]): Unit = {
     increment()
     if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
       Thread.sleep(1000L)
     }
     out.collect(x)
   }
}

On 6/19/20 9:16 AM, Jeff Henrikson wrote:
> Bhaskar,
> 
> Thank you for your thoughtful points.
> 
>  > I want to discuss more on points (1) and (2)
>  > If we take care of them  rest will be good
>  >
>  > Coming to (1)
>  >
>  > Please try to give reasonable checkpoint interval time for every job.
>  > Minum checkpoint interval recommended by flink community is 3 minutes
>  > I thin you should give minimum 3 minutes checkpoint interval for all
> 
> I have spent very little time testing with checkpoint intervals of under 
> 3 minutes.  I frequently test with intervals of 5 minutes and of 30 
> minutes.  I also test with checkpoint intervals such as 60 minutes, and 
> never (manual only).  In terms of which exceptions get thrown, I don't 
> see much difference between 5/30/60, I don't see a lot of difference.
> 
> Infinity (no checkpoint internal) seems to be an interesting value, 
> because before crashing, it seems to process around twice as much state 
> as with any finite checkpoint interval.  The largest savepoints I have 
> captured have been manually triggered using the /job/:jobid/stop REST 
> API.  I think it helps for the snapshot to be synchronous.
> 
> One curiosity about the /job/:jobid/stop command is that from time of 
> the command, it often takes many minutes for the internal processing to 
> stop.
> 
> Another curiosity about /job/:jobid/stop command is that sometimes 
> following a completed savepoint, the cluster goes back to running!
> 
>  > Coming to (2)
>  >
>  > What's your input data rate?
> 
> My application involves what I will call "main" events that are enriched 
> by "secondary" events.  While the secondary events have several 
> different input streams, data types, and join keys, I will estimate the 
> secondary events all together.  My estimate for input rate is as follows:
> 
>      50M "main" events
>      50 secondary events for each main event, for a
>          total of around 2.5B input events
>      8 nodes
>      20 hours
> 
> Combining these figures, we can estimate:
> 
>      50000000*50/8/20/3600 = 4340 events/second/node
> 
> I don't see how to act on your advice for (2).  Maybe your idea is that 
> during backfill/bootstrap, I artificially throttle the inputs to my 
> application?
> 
> 100% of my application state is due to .cogroup, which manages a 
> HeapListState on its own.  I cannot think of any controls for changing 
> how .cogroup handles internal state per se.  I will paste below the 
> Flink code path that .cogroup uses to update its internal state when it 
> runs my application.
> 
> The only control I can think of with .cogroup that indirectly impacts 
> internal state is delayed triggering.
> 
> Currently I use a trigger on every event, which I understand creates a 
> suboptimal number of events.  I previously experimented with delayed 
> triggering, but I did not get good results.
> 
> Just now I tried again ContinuousProcessingTimeTrigger of 30 seconds, 
> with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint 
> interval.  The first checkpoint failed, which has been rare when I use 
> all the same parameters except for triggering on every event.  So it 
> looks worse not better.
> 
> Thanks again,
> 
> 
> Jeff Henrikson
> 
> 
> 
> 
> On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
>> Thanks for the reply.
>> I want to discuss more on points (1) and (2)
>> If we take care of them  rest will be good
>>
>> Coming to (1)
>>
>> Please try to give reasonable checkpoint interval time for every job. 
>> Minum checkpoint interval recommended by flink community is 3 minutes
>> I thin you should give minimum 3 minutes checkpoint interval for all
>>
>> Coming to (2)
>>
>> What's your input data rate?
>> For example you are seeing data at 100 msg/sec, For each message if 
>> there is state changing and you are updating the state with RocksDB, 
>> it's going to
>> create 100 rows in 1 second at RocksDb end, On the average if 50 
>> records have changed each second, even if you are using RocksDB 
>> differentialstate = true,
>> there is no use. Because everytime 50% is new rows getting added. So 
>> the best bet is to update records with RocksDB only once in your 
>> checkpoint interval.
>> Suppose your checkpoint interval is 5 minutes. If you update RocksDB 
>> state once in 5 minutes, then the rate at which new records added to 
>> RocksDB  will be 1 record/5min.
>> Whereas in your original scenario, 30000 records added to rocksDB in 5 
>> min. You can save 1:30000 ratio of records in addition to RocksDB. 
>> Which will save a huge
>> redundant size addition to RocksDB. Ultimately your  state is driven 
>> by your checkpoint interval. From the input source you will go back 5 
>> min back and read the state, similarly from RocksDB side
>> also you can have a state update once in 5 min should work. Otherwise 
>> even if you add state there is no use.
>>
>> Regards
>> Bhaskar
>>
>> Try to update your RocksDB state in an interval equal to the 
>> checkpoint interval. Otherwise in my case many times what's observed is
>> state size grows unnecessarily.
>>
>> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson <jehenrik27@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>>     Vijay,
>>
>>     Thanks for your thoughts.  Below are answers to your questions.
>>
>>       > 1. What's your checkpoint interval?
>>
>>     I have used many different checkpoint intervals, ranging from 5 
>> minutes
>>     to never.  I usually setMinPasueBetweenCheckpoints to the same 
>> value as
>>     the checkpoint interval.
>>
>>       > 2. How frequently are you updating the state into RocksDB?
>>
>>     My understanding is that for .cogroup:
>>
>>         - Triggers control communication outside the operator
>>         - Evictors control cleanup of internal state
>>         - Configurations like write buffer size control the frequency of
>>     state change at the storage layer
>>         - There is no control for how frequently the window state
>>     updates at
>>     the layer of the RocksDB api layer.
>>
>>     Thus, the state update whenever data is ingested.
>>
>>       > 3. How many task managers are you using?
>>
>>     Usually I have been running with one slot per taskmanager.  28GB of
>>     usable ram on each node.
>>
>>       > 4. How much data each task manager handles while taking the
>>     checkpoint?
>>
>>     Funny you should ask.  I would be okay with zero.
>>
>>     The application I am replacing has a latency of 36-48 hours, so if I
>>     had
>>     to fully stop processing to take every snapshot synchronously, it 
>> might
>>     be seen as totally acceptable, especially for initial bootstrap.  
>> Also,
>>     the velocity of running this backfill is approximately 115x real
>>     time on
>>     8 nodes, so the steady-state run may not exhibit the failure mode in
>>     question at all.
>>
>>     It has come as some frustration to me that, in the case of
>>     RocksDBStateBackend, the configuration key state.backend.async
>>     effectively has no meaningful way to be false.
>>
>>     The only way I have found in the existing code to get a behavior like
>>     synchronous snapshot is to POST to /jobs/<jobID>/stop with 
>> drain=false
>>     and a URL.  This method of failing fast is the way that I discovered
>>     that I needed to increase transfer threads from the default.
>>
>>     The reason I don't just run the whole backfill and then take one
>>     snapshot is that even in the absence of checkpoints, a very similar
>>     congestion seems to take the cluster down when I am say 20-30% of the
>>     way through my backfill.
>>
>>     Reloading from my largest feasible snapshot makes it possible to make
>>     another snapshot a bit larger before crash, but not by much.
>>
>>     On first glance, the code change to allow RocksDBStateBackend into a
>>     synchronous snapshots mode looks pretty easy.  Nevertheless, I was
>>     hoping to do the initial launch of my application without needing to
>>     modify the framework.
>>
>>     Regards,
>>
>>
>>     Jeff Henrikson
>>
>>
>>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
>>      > For me this seems to be an IO bottleneck at your task manager.
>>      > I have a couple of queries:
>>      > 1. What's your checkpoint interval?
>>      > 2. How frequently are you updating the state into RocksDB?
>>      > 3. How many task managers are you using?
>>      > 4. How much data each task manager handles while taking the
>>     checkpoint?
>>      >
>>      > For points (3) and (4) , you should be very careful. I feel you 
>> are
>>      > stuck at this.
>>      > You try to scale vertically by increasing more CPU and memory for
>>     each
>>      > task manager.
>>      > If not, try to scale horizontally so that each task manager IO
>>     gets reduces
>>      > Apart from that check is there any bottleneck with the file 
>> system.
>>      >
>>      > Regards
>>      > Bhaskar
>>      >
>>      >
>>      >
>>      >
>>      >
>>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <victtim@gmail.com
>>     <ma...@gmail.com>
>>      > <mailto:victtim@gmail.com <ma...@gmail.com>>> wrote:
>>      >
>>      >     I had a similar problem.   I ended up solving by not 
>> relying on
>>      >     checkpoints for recovery and instead re-read my input sources
>>     (in my
>>      >     case a kafka topic) from the earliest offset and rebuilding
>>     only the
>>      >     state I need.  I only need to care about the past 1 to 2 
>> days of
>>      >     state so can afford to drop anything older.   My recovery
>>     time went
>>      >     from over an hour for just the first checkpoint to under 10
>>     minutes.
>>      >
>>      >     Tim
>>      >
>>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang <myasuka@live.com
>>     <ma...@live.com>
>>      >     <mailto:myasuka@live.com <ma...@live.com>>> wrote:
>>      >
>>      >         Hi Jeff
>>      >
>>      >          1. "after around 50GB of state, I stop being able to
>>     reliably
>>      >             take checkpoints or savepoints. "
>>      >             What is the exact reason that job cannot complete
>>      >             checkpoint? Expired before completing or decline by 
>> some
>>      >             tasks? The former one is manly caused by high
>>     back-pressure
>>      >             and the later one is mainly due to some internal 
>> error.
>>      >          2. Have you checked what reason the remote task manager
>>     is lost?
>>      >             If the remote task manager is not crashed, it might
>>     be due
>>      >             to GC impact, I think you might need to check
>>     task-manager
>>      >             logs and GC logs.
>>      >
>>      >         Best
>>      >         Yun Tang
>>      >      
>>  ------------------------------------------------------------------------
>>      >         *From:* Jeff Henrikson <jehenrik27@gmail.com
>>     <ma...@gmail.com>
>>      >         <mailto:jehenrik27@gmail.com 
>> <ma...@gmail.com>>>
>>      >         *Sent:* Thursday, June 18, 2020 1:46
>>      >         *To:* user <user@flink.apache.org
>>     <ma...@flink.apache.org> <mailto:user@flink.apache.org
>>     <ma...@flink.apache.org>>>
>>      >         *Subject:* Trouble with large state
>>      >         Hello Flink users,
>>      >
>>      >         I have an application of around 10 enrichment joins.  All
>>     events
>>      >         are
>>      >         read from kafka and have event timestamps.  The joins are
>>     built
>>      >         using
>>      >         .cogroup, with a global window, triggering on every 1
>>     event, plus a
>>      >         custom evictor that drops records once a newer record 
>> for the
>>      >         same ID
>>      >         has been processed.  Deletes are represented by empty
>>     events with
>>      >         timestamp and ID (tombstones). That way, we can drop
>>     records when
>>      >         business logic dictates, as opposed to when a maximum
>>     retention
>>      >         has been
>>      >         attained.  The application runs RocksDBStateBackend, on
>>      >         Kubernetes on
>>      >         AWS with local SSDs.
>>      >
>>      >         Unit tests show that the joins produce expected 
>> results.     On an
>>      >         8 node
>>      >         cluster, watermark output progress seems to indicate I
>>     should be
>>      >         able to
>>      >         bootstrap my state of around 500GB in around 1 day.  I am
>>     able
>>      >         to save
>>      >         and restore savepoints for the first half an hour of run
>>     time.
>>      >
>>      >         My current trouble is that after around 50GB of state, 
>> I stop
>>      >         being able
>>      >         to reliably take checkpoints or savepoints.  Some time 
>> after
>>      >         that, I
>>      >         start getting a variety of failures where the first
>>     suspicious
>>      >         log event
>>      >         is a generic cluster connectivity error, such as:
>>      >
>>      >               1) java.io.IOException: Connecting the channel 
>> failed:
>>      >         Connecting
>>      >               to remote task manager + '/10.67.7.101:38955
>>     <http://10.67.7.101:38955>
>>      >         <http://10.67.7.101:38955>' has failed. This
>>      >               might indicate that the remote task manager has
>>     been lost.
>>      >
>>      >               2) org.apache.flink.runtime.io
>>     <http://org.apache.flink.runtime.io>.network.netty.exception
>>      >               .RemoteTransportException: Connection unexpectedly
>>     closed
>>      >         by remote
>>      >               task manager 'null'. This might indicate that the
>>     remote task
>>      >               manager was lost.
>>      >
>>      >               3) Association with remote system
>>      >               [akka.tcp://flink@10.67.6.66:34987
>>     <http://flink@10.67.6.66:34987>
>>      >         <http://flink@10.67.6.66:34987>] has failed, address is 
>> now
>>      >               gated for [50] ms. Reason: [Association failed with
>>      >               [akka.tcp://flink@10.67.6.66:34987
>>     <http://flink@10.67.6.66:34987>
>>      >         <http://flink@10.67.6.66:34987>]] Caused by:
>>      >               [java.net <http://java.net>.NoRouteToHostException:
>>     No route to host]
>>      >
>>      >         I don't see any obvious out of memory errors on the
>>     TaskManager UI.
>>      >
>>      >         Adding nodes to the cluster does not seem to increase the
>>     maximum
>>      >         savable state size.
>>      >
>>      >         I could enable HA, but for the time being I have been
>>     leaving it
>>      >         out to
>>      >         avoid the possibility of masking deterministic faults.
>>      >
>>      >         Below are my configurations.
>>      >
>>      >         Thanks in advance for any advice.
>>      >
>>      >         Regards,
>>      >
>>      >
>>      >         Jeff Henrikson
>>      >
>>      >
>>      >
>>      >         Flink version: 1.10
>>      >
>>      >         Configuration set via code:
>>      >               parallelism=8
>>      >               maxParallelism=64
>>      >     setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>      >               
>> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>      >               setTolerableCheckpointFailureNumber(1000)
>>      >               setMaxConcurrentCheckpoints(1)
>>      >
>>      >      
>>  enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) 
>>
>>      >               RocksDBStateBackend
>>      >     setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>>      >               setNumberOfTransferThreads(25)
>>      >               setDbStoragePath points to a local nvme SSD
>>      >
>>      >         Configuration in flink-conf.yaml:
>>      >
>>      >               jobmanager.rpc.address: localhost
>>      >               jobmanager.rpc.port: 6123
>>      >               jobmanager.heap.size: 28000m
>>      >               taskmanager.memory.process.size: 28000m
>>      >               taskmanager.memory.jvm-metaspace.size: 512m
>>      >               taskmanager.numberOfTaskSlots: 1
>>      >               parallelism.default: 1
>>      >               jobmanager.execution.failover-strategy: full
>>      >
>>      >               cluster.evenly-spread-out-slots: false
>>      >
>>      >               taskmanager.memory.network.fraction: 0.2           #
>>      >         default 0.1
>>      >               taskmanager.memory.framework.off-heap.size: 2GB
>>      >               taskmanager.memory.task.off-heap.size: 2GB
>>      >               taskmanager.network.memory.buffers-per-channel: 32
>>     # default 2
>>      >               taskmanager.memory.managed.fraction: 0.4     # 
>> docs say
>>      >         default 0.1, but something seems to set 0.4
>>      >               taskmanager.memory.task.off-heap.size: 2048MB      #
>>      >         default 128M
>>      >
>>      >               state.backend.fs.memory-threshold: 1048576
>>      >               state.backend.fs.write-buffer-size: 10240000
>>      >               state.backend.local-recovery: true
>>      >               state.backend.rocksdb.writebuffer.size: 64MB
>>      >               state.backend.rocksdb.writebuffer.count: 8
>>      >               state.backend.rocksdb.writebuffer.number-to-merge: 4
>>      >               state.backend.rocksdb.timer-service.factory: heap
>>      >               state.backend.rocksdb.block.cache-size: 64000000 #
>>     default 8MB
>>      >               state.backend.rocksdb.write-batch-size: 16000000 #
>>     default 2MB
>>      >
>>      >               web.checkpoints.history: 250
>>      >
>>

Re: Trouble with large state

Posted by Jeff Henrikson <je...@gmail.com>.
Bhaskar,

Thank you for your thoughtful points.

 > I want to discuss more on points (1) and (2)
 > If we take care of them  rest will be good
 >
 > Coming to (1)
 >
 > Please try to give reasonable checkpoint interval time for every job.
 > Minum checkpoint interval recommended by flink community is 3 minutes
 > I thin you should give minimum 3 minutes checkpoint interval for all

I have spent very little time testing with checkpoint intervals of under 
3 minutes.  I frequently test with intervals of 5 minutes and of 30 
minutes.  I also test with checkpoint intervals such as 60 minutes, and 
never (manual only).  In terms of which exceptions get thrown, I don't 
see much difference between 5/30/60, I don't see a lot of difference.

Infinity (no checkpoint internal) seems to be an interesting value, 
because before crashing, it seems to process around twice as much state 
as with any finite checkpoint interval.  The largest savepoints I have 
captured have been manually triggered using the /job/:jobid/stop REST 
API.  I think it helps for the snapshot to be synchronous.

One curiosity about the /job/:jobid/stop command is that from time of 
the command, it often takes many minutes for the internal processing to 
stop.

Another curiosity about /job/:jobid/stop command is that sometimes 
following a completed savepoint, the cluster goes back to running!

 > Coming to (2)
 >
 > What's your input data rate?

My application involves what I will call "main" events that are enriched 
by "secondary" events.  While the secondary events have several 
different input streams, data types, and join keys, I will estimate the 
secondary events all together.  My estimate for input rate is as follows:

     50M "main" events
     50 secondary events for each main event, for a
         total of around 2.5B input events
     8 nodes
     20 hours

Combining these figures, we can estimate:

     50000000*50/8/20/3600 = 4340 events/second/node

I don't see how to act on your advice for (2).  Maybe your idea is that 
during backfill/bootstrap, I artificially throttle the inputs to my 
application?

100% of my application state is due to .cogroup, which manages a 
HeapListState on its own.  I cannot think of any controls for changing 
how .cogroup handles internal state per se.  I will paste below the 
Flink code path that .cogroup uses to update its internal state when it 
runs my application.

The only control I can think of with .cogroup that indirectly impacts 
internal state is delayed triggering.

Currently I use a trigger on every event, which I understand creates a 
suboptimal number of events.  I previously experimented with delayed 
triggering, but I did not get good results.

Just now I tried again ContinuousProcessingTimeTrigger of 30 seconds, 
with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint 
interval.  The first checkpoint failed, which has been rare when I use 
all the same parameters except for triggering on every event.  So it 
looks worse not better.

Thanks again,


Jeff Henrikson




On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
> Thanks for the reply.
> I want to discuss more on points (1) and (2)
> If we take care of them  rest will be good
> 
> Coming to (1)
> 
> Please try to give reasonable checkpoint interval time for every job. 
> Minum checkpoint interval recommended by flink community is 3 minutes
> I thin you should give minimum 3 minutes checkpoint interval for all
> 
> Coming to (2)
> 
> What's your input data rate?
> For example you are seeing data at 100 msg/sec, For each message if 
> there is state changing and you are updating the state with RocksDB, 
> it's going to
> create 100 rows in 1 second at RocksDb end, On the average if 50 records 
> have changed each second, even if you are using RocksDB 
> differentialstate = true,
> there is no use. Because everytime 50% is new rows getting added. So the 
> best bet is to update records with RocksDB only once in your checkpoint 
> interval.
> Suppose your checkpoint interval is 5 minutes. If you update RocksDB 
> state once in 5 minutes, then the rate at which new records added to 
> RocksDB  will be 1 record/5min.
> Whereas in your original scenario, 30000 records added to rocksDB in 5 
> min. You can save 1:30000 ratio of records in addition to RocksDB. Which 
> will save a huge
> redundant size addition to RocksDB. Ultimately your  state is driven by 
> your checkpoint interval. From the input source you will go back 5 min 
> back and read the state, similarly from RocksDB side
> also you can have a state update once in 5 min should work. Otherwise 
> even if you add state there is no use.
> 
> Regards
> Bhaskar
> 
> Try to update your RocksDB state in an interval equal to the checkpoint 
> interval. Otherwise in my case many times what's observed is
> state size grows unnecessarily.
> 
> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson <jehenrik27@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Vijay,
> 
>     Thanks for your thoughts.  Below are answers to your questions.
> 
>       > 1. What's your checkpoint interval?
> 
>     I have used many different checkpoint intervals, ranging from 5 minutes
>     to never.  I usually setMinPasueBetweenCheckpoints to the same value as
>     the checkpoint interval.
> 
>       > 2. How frequently are you updating the state into RocksDB?
> 
>     My understanding is that for .cogroup:
> 
>         - Triggers control communication outside the operator
>         - Evictors control cleanup of internal state
>         - Configurations like write buffer size control the frequency of
>     state change at the storage layer
>         - There is no control for how frequently the window state
>     updates at
>     the layer of the RocksDB api layer.
> 
>     Thus, the state update whenever data is ingested.
> 
>       > 3. How many task managers are you using?
> 
>     Usually I have been running with one slot per taskmanager.  28GB of
>     usable ram on each node.
> 
>       > 4. How much data each task manager handles while taking the
>     checkpoint?
> 
>     Funny you should ask.  I would be okay with zero.
> 
>     The application I am replacing has a latency of 36-48 hours, so if I
>     had
>     to fully stop processing to take every snapshot synchronously, it might
>     be seen as totally acceptable, especially for initial bootstrap.  Also,
>     the velocity of running this backfill is approximately 115x real
>     time on
>     8 nodes, so the steady-state run may not exhibit the failure mode in
>     question at all.
> 
>     It has come as some frustration to me that, in the case of
>     RocksDBStateBackend, the configuration key state.backend.async
>     effectively has no meaningful way to be false.
> 
>     The only way I have found in the existing code to get a behavior like
>     synchronous snapshot is to POST to /jobs/<jobID>/stop with drain=false
>     and a URL.  This method of failing fast is the way that I discovered
>     that I needed to increase transfer threads from the default.
> 
>     The reason I don't just run the whole backfill and then take one
>     snapshot is that even in the absence of checkpoints, a very similar
>     congestion seems to take the cluster down when I am say 20-30% of the
>     way through my backfill.
> 
>     Reloading from my largest feasible snapshot makes it possible to make
>     another snapshot a bit larger before crash, but not by much.
> 
>     On first glance, the code change to allow RocksDBStateBackend into a
>     synchronous snapshots mode looks pretty easy.  Nevertheless, I was
>     hoping to do the initial launch of my application without needing to
>     modify the framework.
> 
>     Regards,
> 
> 
>     Jeff Henrikson
> 
> 
>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
>      > For me this seems to be an IO bottleneck at your task manager.
>      > I have a couple of queries:
>      > 1. What's your checkpoint interval?
>      > 2. How frequently are you updating the state into RocksDB?
>      > 3. How many task managers are you using?
>      > 4. How much data each task manager handles while taking the
>     checkpoint?
>      >
>      > For points (3) and (4) , you should be very careful. I feel you are
>      > stuck at this.
>      > You try to scale vertically by increasing more CPU and memory for
>     each
>      > task manager.
>      > If not, try to scale horizontally so that each task manager IO
>     gets reduces
>      > Apart from that check is there any bottleneck with the file system.
>      >
>      > Regards
>      > Bhaskar
>      >
>      >
>      >
>      >
>      >
>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <victtim@gmail.com
>     <ma...@gmail.com>
>      > <mailto:victtim@gmail.com <ma...@gmail.com>>> wrote:
>      >
>      >     I had a similar problem.   I ended up solving by not relying on
>      >     checkpoints for recovery and instead re-read my input sources
>     (in my
>      >     case a kafka topic) from the earliest offset and rebuilding
>     only the
>      >     state I need.  I only need to care about the past 1 to 2 days of
>      >     state so can afford to drop anything older.   My recovery
>     time went
>      >     from over an hour for just the first checkpoint to under 10
>     minutes.
>      >
>      >     Tim
>      >
>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang <myasuka@live.com
>     <ma...@live.com>
>      >     <mailto:myasuka@live.com <ma...@live.com>>> wrote:
>      >
>      >         Hi Jeff
>      >
>      >          1. "after around 50GB of state, I stop being able to
>     reliably
>      >             take checkpoints or savepoints. "
>      >             What is the exact reason that job cannot complete
>      >             checkpoint? Expired before completing or decline by some
>      >             tasks? The former one is manly caused by high
>     back-pressure
>      >             and the later one is mainly due to some internal error.
>      >          2. Have you checked what reason the remote task manager
>     is lost?
>      >             If the remote task manager is not crashed, it might
>     be due
>      >             to GC impact, I think you might need to check
>     task-manager
>      >             logs and GC logs.
>      >
>      >         Best
>      >         Yun Tang
>      >       
>       ------------------------------------------------------------------------
>      >         *From:* Jeff Henrikson <jehenrik27@gmail.com
>     <ma...@gmail.com>
>      >         <mailto:jehenrik27@gmail.com <ma...@gmail.com>>>
>      >         *Sent:* Thursday, June 18, 2020 1:46
>      >         *To:* user <user@flink.apache.org
>     <ma...@flink.apache.org> <mailto:user@flink.apache.org
>     <ma...@flink.apache.org>>>
>      >         *Subject:* Trouble with large state
>      >         Hello Flink users,
>      >
>      >         I have an application of around 10 enrichment joins.  All
>     events
>      >         are
>      >         read from kafka and have event timestamps.  The joins are
>     built
>      >         using
>      >         .cogroup, with a global window, triggering on every 1
>     event, plus a
>      >         custom evictor that drops records once a newer record for the
>      >         same ID
>      >         has been processed.  Deletes are represented by empty
>     events with
>      >         timestamp and ID (tombstones). That way, we can drop
>     records when
>      >         business logic dictates, as opposed to when a maximum
>     retention
>      >         has been
>      >         attained.  The application runs RocksDBStateBackend, on
>      >         Kubernetes on
>      >         AWS with local SSDs.
>      >
>      >         Unit tests show that the joins produce expected results. 
>     On an
>      >         8 node
>      >         cluster, watermark output progress seems to indicate I
>     should be
>      >         able to
>      >         bootstrap my state of around 500GB in around 1 day.  I am
>     able
>      >         to save
>      >         and restore savepoints for the first half an hour of run
>     time.
>      >
>      >         My current trouble is that after around 50GB of state, I stop
>      >         being able
>      >         to reliably take checkpoints or savepoints.  Some time after
>      >         that, I
>      >         start getting a variety of failures where the first
>     suspicious
>      >         log event
>      >         is a generic cluster connectivity error, such as:
>      >
>      >               1) java.io.IOException: Connecting the channel failed:
>      >         Connecting
>      >               to remote task manager + '/10.67.7.101:38955
>     <http://10.67.7.101:38955>
>      >         <http://10.67.7.101:38955>' has failed. This
>      >               might indicate that the remote task manager has
>     been lost.
>      >
>      >               2) org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.netty.exception
>      >               .RemoteTransportException: Connection unexpectedly
>     closed
>      >         by remote
>      >               task manager 'null'. This might indicate that the
>     remote task
>      >               manager was lost.
>      >
>      >               3) Association with remote system
>      >               [akka.tcp://flink@10.67.6.66:34987
>     <http://flink@10.67.6.66:34987>
>      >         <http://flink@10.67.6.66:34987>] has failed, address is now
>      >               gated for [50] ms. Reason: [Association failed with
>      >               [akka.tcp://flink@10.67.6.66:34987
>     <http://flink@10.67.6.66:34987>
>      >         <http://flink@10.67.6.66:34987>]] Caused by:
>      >               [java.net <http://java.net>.NoRouteToHostException:
>     No route to host]
>      >
>      >         I don't see any obvious out of memory errors on the
>     TaskManager UI.
>      >
>      >         Adding nodes to the cluster does not seem to increase the
>     maximum
>      >         savable state size.
>      >
>      >         I could enable HA, but for the time being I have been
>     leaving it
>      >         out to
>      >         avoid the possibility of masking deterministic faults.
>      >
>      >         Below are my configurations.
>      >
>      >         Thanks in advance for any advice.
>      >
>      >         Regards,
>      >
>      >
>      >         Jeff Henrikson
>      >
>      >
>      >
>      >         Flink version: 1.10
>      >
>      >         Configuration set via code:
>      >               parallelism=8
>      >               maxParallelism=64
>      >              
>     setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>      >               setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>      >               setTolerableCheckpointFailureNumber(1000)
>      >               setMaxConcurrentCheckpoints(1)
>      >
>      >       
>       enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>      >               RocksDBStateBackend
>      >              
>     setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>      >               setNumberOfTransferThreads(25)
>      >               setDbStoragePath points to a local nvme SSD
>      >
>      >         Configuration in flink-conf.yaml:
>      >
>      >               jobmanager.rpc.address: localhost
>      >               jobmanager.rpc.port: 6123
>      >               jobmanager.heap.size: 28000m
>      >               taskmanager.memory.process.size: 28000m
>      >               taskmanager.memory.jvm-metaspace.size: 512m
>      >               taskmanager.numberOfTaskSlots: 1
>      >               parallelism.default: 1
>      >               jobmanager.execution.failover-strategy: full
>      >
>      >               cluster.evenly-spread-out-slots: false
>      >
>      >               taskmanager.memory.network.fraction: 0.2           #
>      >         default 0.1
>      >               taskmanager.memory.framework.off-heap.size: 2GB
>      >               taskmanager.memory.task.off-heap.size: 2GB
>      >               taskmanager.network.memory.buffers-per-channel: 32
>     # default 2
>      >               taskmanager.memory.managed.fraction: 0.4          
>     # docs say
>      >         default 0.1, but something seems to set 0.4
>      >               taskmanager.memory.task.off-heap.size: 2048MB      #
>      >         default 128M
>      >
>      >               state.backend.fs.memory-threshold: 1048576
>      >               state.backend.fs.write-buffer-size: 10240000
>      >               state.backend.local-recovery: true
>      >               state.backend.rocksdb.writebuffer.size: 64MB
>      >               state.backend.rocksdb.writebuffer.count: 8
>      >               state.backend.rocksdb.writebuffer.number-to-merge: 4
>      >               state.backend.rocksdb.timer-service.factory: heap
>      >               state.backend.rocksdb.block.cache-size: 64000000 #
>     default 8MB
>      >               state.backend.rocksdb.write-batch-size: 16000000 #
>     default 2MB
>      >
>      >               web.checkpoints.history: 250
>      >
> 

Re: Trouble with large state

Posted by Vijay Bhaskar <bh...@gmail.com>.
Thanks for the reply.
I want to discuss more on points (1) and (2)
If we take care of them  rest will be good

Coming to (1)

Please try to give reasonable checkpoint interval time for every job. Minum
checkpoint interval recommended by flink community is 3 minutes
I thin you should give minimum 3 minutes checkpoint interval for all

Coming to (2)

What's your input data rate?
For example you are seeing data at 100 msg/sec, For each message if there
is state changing and you are updating the state with RocksDB, it's going to
create 100 rows in 1 second at RocksDb end, On the average if 50 records
have changed each second, even if you are using RocksDB differentialstate =
true,
there is no use. Because everytime 50% is new rows getting added. So the
best bet is to update records with RocksDB only once in your checkpoint
interval.
Suppose your checkpoint interval is 5 minutes. If you update RocksDB state
once in 5 minutes, then the rate at which new records added to RocksDB
will be 1 record/5min.
Whereas in your original scenario, 30000 records added to rocksDB in 5 min.
You can save 1:30000 ratio of records in addition to RocksDB. Which will
save a huge
redundant size addition to RocksDB. Ultimately your  state is driven by
your checkpoint interval. From the input source you will go back 5 min back
and read the state, similarly from RocksDB side
also you can have a state update once in 5 min should work. Otherwise even
if you add state there is no use.

Regards
Bhaskar

Try to update your RocksDB state in an interval equal to the checkpoint
interval. Otherwise in my case many times what's observed is
state size grows unnecessarily.

On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson <je...@gmail.com>
wrote:

> Vijay,
>
> Thanks for your thoughts.  Below are answers to your questions.
>
>  > 1. What's your checkpoint interval?
>
> I have used many different checkpoint intervals, ranging from 5 minutes
> to never.  I usually setMinPasueBetweenCheckpoints to the same value as
> the checkpoint interval.
>
>  > 2. How frequently are you updating the state into RocksDB?
>
> My understanding is that for .cogroup:
>
>    - Triggers control communication outside the operator
>    - Evictors control cleanup of internal state
>    - Configurations like write buffer size control the frequency of
> state change at the storage layer
>    - There is no control for how frequently the window state updates at
> the layer of the RocksDB api layer.
>
> Thus, the state update whenever data is ingested.
>
>  > 3. How many task managers are you using?
>
> Usually I have been running with one slot per taskmanager.  28GB of
> usable ram on each node.
>
>  > 4. How much data each task manager handles while taking the checkpoint?
>
> Funny you should ask.  I would be okay with zero.
>
> The application I am replacing has a latency of 36-48 hours, so if I had
> to fully stop processing to take every snapshot synchronously, it might
> be seen as totally acceptable, especially for initial bootstrap.  Also,
> the velocity of running this backfill is approximately 115x real time on
> 8 nodes, so the steady-state run may not exhibit the failure mode in
> question at all.
>
> It has come as some frustration to me that, in the case of
> RocksDBStateBackend, the configuration key state.backend.async
> effectively has no meaningful way to be false.
>
> The only way I have found in the existing code to get a behavior like
> synchronous snapshot is to POST to /jobs/<jobID>/stop with drain=false
> and a URL.  This method of failing fast is the way that I discovered
> that I needed to increase transfer threads from the default.
>
> The reason I don't just run the whole backfill and then take one
> snapshot is that even in the absence of checkpoints, a very similar
> congestion seems to take the cluster down when I am say 20-30% of the
> way through my backfill.
>
> Reloading from my largest feasible snapshot makes it possible to make
> another snapshot a bit larger before crash, but not by much.
>
> On first glance, the code change to allow RocksDBStateBackend into a
> synchronous snapshots mode looks pretty easy.  Nevertheless, I was
> hoping to do the initial launch of my application without needing to
> modify the framework.
>
> Regards,
>
>
> Jeff Henrikson
>
>
> On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
> > For me this seems to be an IO bottleneck at your task manager.
> > I have a couple of queries:
> > 1. What's your checkpoint interval?
> > 2. How frequently are you updating the state into RocksDB?
> > 3. How many task managers are you using?
> > 4. How much data each task manager handles while taking the checkpoint?
> >
> > For points (3) and (4) , you should be very careful. I feel you are
> > stuck at this.
> > You try to scale vertically by increasing more CPU and memory for each
> > task manager.
> > If not, try to scale horizontally so that each task manager IO gets
> reduces
> > Apart from that check is there any bottleneck with the file system.
> >
> > Regards
> > Bhaskar
> >
> >
> >
> >
> >
> > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <victtim@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     I had a similar problem.   I ended up solving by not relying on
> >     checkpoints for recovery and instead re-read my input sources (in my
> >     case a kafka topic) from the earliest offset and rebuilding only the
> >     state I need.  I only need to care about the past 1 to 2 days of
> >     state so can afford to drop anything older.   My recovery time went
> >     from over an hour for just the first checkpoint to under 10 minutes.
> >
> >     Tim
> >
> >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang <myasuka@live.com
> >     <ma...@live.com>> wrote:
> >
> >         Hi Jeff
> >
> >          1. "after around 50GB of state, I stop being able to reliably
> >             take checkpoints or savepoints. "
> >             What is the exact reason that job cannot complete
> >             checkpoint? Expired before completing or decline by some
> >             tasks? The former one is manly caused by high back-pressure
> >             and the later one is mainly due to some internal error.
> >          2. Have you checked what reason the remote task manager is lost?
> >             If the remote task manager is not crashed, it might be due
> >             to GC impact, I think you might need to check task-manager
> >             logs and GC logs.
> >
> >         Best
> >         Yun Tang
> >
>  ------------------------------------------------------------------------
> >         *From:* Jeff Henrikson <jehenrik27@gmail.com
> >         <ma...@gmail.com>>
> >         *Sent:* Thursday, June 18, 2020 1:46
> >         *To:* user <user@flink.apache.org <mailto:user@flink.apache.org
> >>
> >         *Subject:* Trouble with large state
> >         Hello Flink users,
> >
> >         I have an application of around 10 enrichment joins.  All events
> >         are
> >         read from kafka and have event timestamps.  The joins are built
> >         using
> >         .cogroup, with a global window, triggering on every 1 event,
> plus a
> >         custom evictor that drops records once a newer record for the
> >         same ID
> >         has been processed.  Deletes are represented by empty events with
> >         timestamp and ID (tombstones). That way, we can drop records when
> >         business logic dictates, as opposed to when a maximum retention
> >         has been
> >         attained.  The application runs RocksDBStateBackend, on
> >         Kubernetes on
> >         AWS with local SSDs.
> >
> >         Unit tests show that the joins produce expected results.  On an
> >         8 node
> >         cluster, watermark output progress seems to indicate I should be
> >         able to
> >         bootstrap my state of around 500GB in around 1 day.  I am able
> >         to save
> >         and restore savepoints for the first half an hour of run time.
> >
> >         My current trouble is that after around 50GB of state, I stop
> >         being able
> >         to reliably take checkpoints or savepoints.  Some time after
> >         that, I
> >         start getting a variety of failures where the first suspicious
> >         log event
> >         is a generic cluster connectivity error, such as:
> >
> >               1) java.io.IOException: Connecting the channel failed:
> >         Connecting
> >               to remote task manager + '/10.67.7.101:38955
> >         <http://10.67.7.101:38955>' has failed. This
> >               might indicate that the remote task manager has been lost.
> >
> >               2) org.apache.flink.runtime.io.network.netty.exception
> >               .RemoteTransportException: Connection unexpectedly closed
> >         by remote
> >               task manager 'null'. This might indicate that the remote
> task
> >               manager was lost.
> >
> >               3) Association with remote system
> >               [akka.tcp://flink@10.67.6.66:34987
> >         <http://flink@10.67.6.66:34987>] has failed, address is now
> >               gated for [50] ms. Reason: [Association failed with
> >               [akka.tcp://flink@10.67.6.66:34987
> >         <http://flink@10.67.6.66:34987>]] Caused by:
> >               [java.net.NoRouteToHostException: No route to host]
> >
> >         I don't see any obvious out of memory errors on the TaskManager
> UI.
> >
> >         Adding nodes to the cluster does not seem to increase the maximum
> >         savable state size.
> >
> >         I could enable HA, but for the time being I have been leaving it
> >         out to
> >         avoid the possibility of masking deterministic faults.
> >
> >         Below are my configurations.
> >
> >         Thanks in advance for any advice.
> >
> >         Regards,
> >
> >
> >         Jeff Henrikson
> >
> >
> >
> >         Flink version: 1.10
> >
> >         Configuration set via code:
> >               parallelism=8
> >               maxParallelism=64
> >               setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >               setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> >               setTolerableCheckpointFailureNumber(1000)
> >               setMaxConcurrentCheckpoints(1)
> >
> >
>  enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> >               RocksDBStateBackend
> >               setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
> >               setNumberOfTransferThreads(25)
> >               setDbStoragePath points to a local nvme SSD
> >
> >         Configuration in flink-conf.yaml:
> >
> >               jobmanager.rpc.address: localhost
> >               jobmanager.rpc.port: 6123
> >               jobmanager.heap.size: 28000m
> >               taskmanager.memory.process.size: 28000m
> >               taskmanager.memory.jvm-metaspace.size: 512m
> >               taskmanager.numberOfTaskSlots: 1
> >               parallelism.default: 1
> >               jobmanager.execution.failover-strategy: full
> >
> >               cluster.evenly-spread-out-slots: false
> >
> >               taskmanager.memory.network.fraction: 0.2           #
> >         default 0.1
> >               taskmanager.memory.framework.off-heap.size: 2GB
> >               taskmanager.memory.task.off-heap.size: 2GB
> >               taskmanager.network.memory.buffers-per-channel: 32 #
> default 2
> >               taskmanager.memory.managed.fraction: 0.4           # docs
> say
> >         default 0.1, but something seems to set 0.4
> >               taskmanager.memory.task.off-heap.size: 2048MB      #
> >         default 128M
> >
> >               state.backend.fs.memory-threshold: 1048576
> >               state.backend.fs.write-buffer-size: 10240000
> >               state.backend.local-recovery: true
> >               state.backend.rocksdb.writebuffer.size: 64MB
> >               state.backend.rocksdb.writebuffer.count: 8
> >               state.backend.rocksdb.writebuffer.number-to-merge: 4
> >               state.backend.rocksdb.timer-service.factory: heap
> >               state.backend.rocksdb.block.cache-size: 64000000 # default
> 8MB
> >               state.backend.rocksdb.write-batch-size: 16000000 # default
> 2MB
> >
> >               web.checkpoints.history: 250
> >
>

Re: Trouble with large state

Posted by Jeff Henrikson <je...@gmail.com>.
Vijay,

Thanks for your thoughts.  Below are answers to your questions.

 > 1. What's your checkpoint interval?

I have used many different checkpoint intervals, ranging from 5 minutes 
to never.  I usually setMinPasueBetweenCheckpoints to the same value as 
the checkpoint interval.

 > 2. How frequently are you updating the state into RocksDB?

My understanding is that for .cogroup:

   - Triggers control communication outside the operator
   - Evictors control cleanup of internal state
   - Configurations like write buffer size control the frequency of 
state change at the storage layer
   - There is no control for how frequently the window state updates at 
the layer of the RocksDB api layer.

Thus, the state update whenever data is ingested.

 > 3. How many task managers are you using?

Usually I have been running with one slot per taskmanager.  28GB of 
usable ram on each node.

 > 4. How much data each task manager handles while taking the checkpoint?

Funny you should ask.  I would be okay with zero.

The application I am replacing has a latency of 36-48 hours, so if I had 
to fully stop processing to take every snapshot synchronously, it might 
be seen as totally acceptable, especially for initial bootstrap.  Also, 
the velocity of running this backfill is approximately 115x real time on 
8 nodes, so the steady-state run may not exhibit the failure mode in 
question at all.

It has come as some frustration to me that, in the case of 
RocksDBStateBackend, the configuration key state.backend.async 
effectively has no meaningful way to be false.

The only way I have found in the existing code to get a behavior like 
synchronous snapshot is to POST to /jobs/<jobID>/stop with drain=false 
and a URL.  This method of failing fast is the way that I discovered 
that I needed to increase transfer threads from the default.

The reason I don't just run the whole backfill and then take one 
snapshot is that even in the absence of checkpoints, a very similar 
congestion seems to take the cluster down when I am say 20-30% of the 
way through my backfill.

Reloading from my largest feasible snapshot makes it possible to make 
another snapshot a bit larger before crash, but not by much.

On first glance, the code change to allow RocksDBStateBackend into a 
synchronous snapshots mode looks pretty easy.  Nevertheless, I was 
hoping to do the initial launch of my application without needing to 
modify the framework.

Regards,


Jeff Henrikson


On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
> For me this seems to be an IO bottleneck at your task manager.
> I have a couple of queries:
> 1. What's your checkpoint interval?
> 2. How frequently are you updating the state into RocksDB?
> 3. How many task managers are you using?
> 4. How much data each task manager handles while taking the checkpoint?
> 
> For points (3) and (4) , you should be very careful. I feel you are 
> stuck at this.
> You try to scale vertically by increasing more CPU and memory for each 
> task manager.
> If not, try to scale horizontally so that each task manager IO gets reduces
> Apart from that check is there any bottleneck with the file system.
> 
> Regards
> Bhaskar
> 
> 
> 
> 
> 
> On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <victtim@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     I had a similar problem.   I ended up solving by not relying on
>     checkpoints for recovery and instead re-read my input sources (in my
>     case a kafka topic) from the earliest offset and rebuilding only the
>     state I need.  I only need to care about the past 1 to 2 days of
>     state so can afford to drop anything older.   My recovery time went
>     from over an hour for just the first checkpoint to under 10 minutes.
> 
>     Tim
> 
>     On Wed, Jun 17, 2020, 11:52 PM Yun Tang <myasuka@live.com
>     <ma...@live.com>> wrote:
> 
>         Hi Jeff
> 
>          1. "after around 50GB of state, I stop being able to reliably
>             take checkpoints or savepoints. "
>             What is the exact reason that job cannot complete
>             checkpoint? Expired before completing or decline by some
>             tasks? The former one is manly caused by high back-pressure
>             and the later one is mainly due to some internal error.
>          2. Have you checked what reason the remote task manager is lost?
>             If the remote task manager is not crashed, it might be due
>             to GC impact, I think you might need to check task-manager
>             logs and GC logs.
> 
>         Best
>         Yun Tang
>         ------------------------------------------------------------------------
>         *From:* Jeff Henrikson <jehenrik27@gmail.com
>         <ma...@gmail.com>>
>         *Sent:* Thursday, June 18, 2020 1:46
>         *To:* user <user@flink.apache.org <ma...@flink.apache.org>>
>         *Subject:* Trouble with large state
>         Hello Flink users,
> 
>         I have an application of around 10 enrichment joins.  All events
>         are
>         read from kafka and have event timestamps.  The joins are built
>         using
>         .cogroup, with a global window, triggering on every 1 event, plus a
>         custom evictor that drops records once a newer record for the
>         same ID
>         has been processed.  Deletes are represented by empty events with
>         timestamp and ID (tombstones). That way, we can drop records when
>         business logic dictates, as opposed to when a maximum retention
>         has been
>         attained.  The application runs RocksDBStateBackend, on
>         Kubernetes on
>         AWS with local SSDs.
> 
>         Unit tests show that the joins produce expected results.  On an
>         8 node
>         cluster, watermark output progress seems to indicate I should be
>         able to
>         bootstrap my state of around 500GB in around 1 day.  I am able
>         to save
>         and restore savepoints for the first half an hour of run time.
> 
>         My current trouble is that after around 50GB of state, I stop
>         being able
>         to reliably take checkpoints or savepoints.  Some time after
>         that, I
>         start getting a variety of failures where the first suspicious
>         log event
>         is a generic cluster connectivity error, such as:
> 
>               1) java.io.IOException: Connecting the channel failed:
>         Connecting
>               to remote task manager + '/10.67.7.101:38955
>         <http://10.67.7.101:38955>' has failed. This
>               might indicate that the remote task manager has been lost.
> 
>               2) org.apache.flink.runtime.io.network.netty.exception
>               .RemoteTransportException: Connection unexpectedly closed
>         by remote
>               task manager 'null'. This might indicate that the remote task
>               manager was lost.
> 
>               3) Association with remote system
>               [akka.tcp://flink@10.67.6.66:34987
>         <http://flink@10.67.6.66:34987>] has failed, address is now
>               gated for [50] ms. Reason: [Association failed with
>               [akka.tcp://flink@10.67.6.66:34987
>         <http://flink@10.67.6.66:34987>]] Caused by:
>               [java.net.NoRouteToHostException: No route to host]
> 
>         I don't see any obvious out of memory errors on the TaskManager UI.
> 
>         Adding nodes to the cluster does not seem to increase the maximum
>         savable state size.
> 
>         I could enable HA, but for the time being I have been leaving it
>         out to
>         avoid the possibility of masking deterministic faults.
> 
>         Below are my configurations.
> 
>         Thanks in advance for any advice.
> 
>         Regards,
> 
> 
>         Jeff Henrikson
> 
> 
> 
>         Flink version: 1.10
> 
>         Configuration set via code:
>               parallelism=8
>               maxParallelism=64
>               setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>               setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>               setTolerableCheckpointFailureNumber(1000)
>               setMaxConcurrentCheckpoints(1)
> 
>         enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>               RocksDBStateBackend
>               setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>               setNumberOfTransferThreads(25)
>               setDbStoragePath points to a local nvme SSD
> 
>         Configuration in flink-conf.yaml:
> 
>               jobmanager.rpc.address: localhost
>               jobmanager.rpc.port: 6123
>               jobmanager.heap.size: 28000m
>               taskmanager.memory.process.size: 28000m
>               taskmanager.memory.jvm-metaspace.size: 512m
>               taskmanager.numberOfTaskSlots: 1
>               parallelism.default: 1
>               jobmanager.execution.failover-strategy: full
> 
>               cluster.evenly-spread-out-slots: false
> 
>               taskmanager.memory.network.fraction: 0.2           #
>         default 0.1
>               taskmanager.memory.framework.off-heap.size: 2GB
>               taskmanager.memory.task.off-heap.size: 2GB
>               taskmanager.network.memory.buffers-per-channel: 32 # default 2
>               taskmanager.memory.managed.fraction: 0.4           # docs say
>         default 0.1, but something seems to set 0.4
>               taskmanager.memory.task.off-heap.size: 2048MB      #
>         default 128M
> 
>               state.backend.fs.memory-threshold: 1048576
>               state.backend.fs.write-buffer-size: 10240000
>               state.backend.local-recovery: true
>               state.backend.rocksdb.writebuffer.size: 64MB
>               state.backend.rocksdb.writebuffer.count: 8
>               state.backend.rocksdb.writebuffer.number-to-merge: 4
>               state.backend.rocksdb.timer-service.factory: heap
>               state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
>               state.backend.rocksdb.write-batch-size: 16000000 # default 2MB
> 
>               web.checkpoints.history: 250
> 

Re: Trouble with large state

Posted by Vijay Bhaskar <bh...@gmail.com>.
For me this seems to be an IO bottleneck at your task manager.
I have a couple of queries:
1. What's your checkpoint interval?
2. How frequently are you updating the state into RocksDB?
3. How many task managers are you using?
4. How much data each task manager handles while taking the checkpoint?

For points (3) and (4) , you should be very careful. I feel you are stuck
at this.
You try to scale vertically by increasing more CPU and memory for each task
manager.
If not, try to scale horizontally so that each task manager IO gets reduces
Apart from that check is there any bottleneck with the file system.

Regards
Bhaskar





On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <vi...@gmail.com> wrote:

> I had a similar problem.   I ended up solving by not relying on
> checkpoints for recovery and instead re-read my input sources (in my case a
> kafka topic) from the earliest offset and rebuilding only the state I
> need.  I only need to care about the past 1 to 2 days of state so can
> afford to drop anything older.   My recovery time went from over an hour
> for just the first checkpoint to under 10 minutes.
>
> Tim
>
> On Wed, Jun 17, 2020, 11:52 PM Yun Tang <my...@live.com> wrote:
>
>> Hi Jeff
>>
>>
>>    1. " after around 50GB of state, I stop being able to reliably take
>>    checkpoints or savepoints. "
>>    What is the exact reason that job cannot complete checkpoint? Expired
>>    before completing or decline by some tasks? The former one is manly caused
>>    by high back-pressure and the later one is mainly due to some internal
>>    error.
>>    2. Have you checked what reason the remote task manager is lost?
>>    If the remote task manager is not crashed, it might be due to GC
>>    impact, I think you might need to check task-manager logs and GC logs.
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Jeff Henrikson <je...@gmail.com>
>> *Sent:* Thursday, June 18, 2020 1:46
>> *To:* user <us...@flink.apache.org>
>> *Subject:* Trouble with large state
>>
>> Hello Flink users,
>>
>> I have an application of around 10 enrichment joins.  All events are
>> read from kafka and have event timestamps.  The joins are built using
>> .cogroup, with a global window, triggering on every 1 event, plus a
>> custom evictor that drops records once a newer record for the same ID
>> has been processed.  Deletes are represented by empty events with
>> timestamp and ID (tombstones). That way, we can drop records when
>> business logic dictates, as opposed to when a maximum retention has been
>> attained.  The application runs RocksDBStateBackend, on Kubernetes on
>> AWS with local SSDs.
>>
>> Unit tests show that the joins produce expected results.  On an 8 node
>> cluster, watermark output progress seems to indicate I should be able to
>> bootstrap my state of around 500GB in around 1 day.  I am able to save
>> and restore savepoints for the first half an hour of run time.
>>
>> My current trouble is that after around 50GB of state, I stop being able
>> to reliably take checkpoints or savepoints.  Some time after that, I
>> start getting a variety of failures where the first suspicious log event
>> is a generic cluster connectivity error, such as:
>>
>>      1) java.io.IOException: Connecting the channel failed: Connecting
>>      to remote task manager + '/10.67.7.101:38955' has failed. This
>>      might indicate that the remote task manager has been lost.
>>
>>      2) org.apache.flink.runtime.io.network.netty.exception
>>      .RemoteTransportException: Connection unexpectedly closed by remote
>>      task manager 'null'. This might indicate that the remote task
>>      manager was lost.
>>
>>      3) Association with remote system
>>      [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
>>      gated for [50] ms. Reason: [Association failed with
>>      [akka.tcp://flink@10.67.6.66:34987]] Caused by:
>>      [java.net.NoRouteToHostException: No route to host]
>>
>> I don't see any obvious out of memory errors on the TaskManager UI.
>>
>> Adding nodes to the cluster does not seem to increase the maximum
>> savable state size.
>>
>> I could enable HA, but for the time being I have been leaving it out to
>> avoid the possibility of masking deterministic faults.
>>
>> Below are my configurations.
>>
>> Thanks in advance for any advice.
>>
>> Regards,
>>
>>
>> Jeff Henrikson
>>
>>
>>
>> Flink version: 1.10
>>
>> Configuration set via code:
>>      parallelism=8
>>      maxParallelism=64
>>      setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>      setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>      setTolerableCheckpointFailureNumber(1000)
>>      setMaxConcurrentCheckpoints(1)
>>
>>
>> enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>      RocksDBStateBackend
>>      setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>>      setNumberOfTransferThreads(25)
>>      setDbStoragePath points to a local nvme SSD
>>
>> Configuration in flink-conf.yaml:
>>
>>      jobmanager.rpc.address: localhost
>>      jobmanager.rpc.port: 6123
>>      jobmanager.heap.size: 28000m
>>      taskmanager.memory.process.size: 28000m
>>      taskmanager.memory.jvm-metaspace.size: 512m
>>      taskmanager.numberOfTaskSlots: 1
>>      parallelism.default: 1
>>      jobmanager.execution.failover-strategy: full
>>
>>      cluster.evenly-spread-out-slots: false
>>
>>      taskmanager.memory.network.fraction: 0.2           # default 0.1
>>      taskmanager.memory.framework.off-heap.size: 2GB
>>      taskmanager.memory.task.off-heap.size: 2GB
>>      taskmanager.network.memory.buffers-per-channel: 32 # default 2
>>      taskmanager.memory.managed.fraction: 0.4           # docs say
>> default 0.1, but something seems to set 0.4
>>      taskmanager.memory.task.off-heap.size: 2048MB      # default 128M
>>
>>      state.backend.fs.memory-threshold: 1048576
>>      state.backend.fs.write-buffer-size: 10240000
>>      state.backend.local-recovery: true
>>      state.backend.rocksdb.writebuffer.size: 64MB
>>      state.backend.rocksdb.writebuffer.count: 8
>>      state.backend.rocksdb.writebuffer.number-to-merge: 4
>>      state.backend.rocksdb.timer-service.factory: heap
>>      state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
>>      state.backend.rocksdb.write-batch-size: 16000000 # default 2MB
>>
>>      web.checkpoints.history: 250
>>
>

Re: Trouble with large state

Posted by Timothy Victor <vi...@gmail.com>.
I had a similar problem.   I ended up solving by not relying on checkpoints
for recovery and instead re-read my input sources (in my case a kafka
topic) from the earliest offset and rebuilding only the state I need.  I
only need to care about the past 1 to 2 days of state so can afford to drop
anything older.   My recovery time went from over an hour for just the
first checkpoint to under 10 minutes.

Tim

On Wed, Jun 17, 2020, 11:52 PM Yun Tang <my...@live.com> wrote:

> Hi Jeff
>
>
>    1. " after around 50GB of state, I stop being able to reliably take
>    checkpoints or savepoints. "
>    What is the exact reason that job cannot complete checkpoint? Expired
>    before completing or decline by some tasks? The former one is manly caused
>    by high back-pressure and the later one is mainly due to some internal
>    error.
>    2. Have you checked what reason the remote task manager is lost?
>    If the remote task manager is not crashed, it might be due to GC
>    impact, I think you might need to check task-manager logs and GC logs.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Jeff Henrikson <je...@gmail.com>
> *Sent:* Thursday, June 18, 2020 1:46
> *To:* user <us...@flink.apache.org>
> *Subject:* Trouble with large state
>
> Hello Flink users,
>
> I have an application of around 10 enrichment joins.  All events are
> read from kafka and have event timestamps.  The joins are built using
> .cogroup, with a global window, triggering on every 1 event, plus a
> custom evictor that drops records once a newer record for the same ID
> has been processed.  Deletes are represented by empty events with
> timestamp and ID (tombstones). That way, we can drop records when
> business logic dictates, as opposed to when a maximum retention has been
> attained.  The application runs RocksDBStateBackend, on Kubernetes on
> AWS with local SSDs.
>
> Unit tests show that the joins produce expected results.  On an 8 node
> cluster, watermark output progress seems to indicate I should be able to
> bootstrap my state of around 500GB in around 1 day.  I am able to save
> and restore savepoints for the first half an hour of run time.
>
> My current trouble is that after around 50GB of state, I stop being able
> to reliably take checkpoints or savepoints.  Some time after that, I
> start getting a variety of failures where the first suspicious log event
> is a generic cluster connectivity error, such as:
>
>      1) java.io.IOException: Connecting the channel failed: Connecting
>      to remote task manager + '/10.67.7.101:38955' has failed. This
>      might indicate that the remote task manager has been lost.
>
>      2) org.apache.flink.runtime.io.network.netty.exception
>      .RemoteTransportException: Connection unexpectedly closed by remote
>      task manager 'null'. This might indicate that the remote task
>      manager was lost.
>
>      3) Association with remote system
>      [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
>      gated for [50] ms. Reason: [Association failed with
>      [akka.tcp://flink@10.67.6.66:34987]] Caused by:
>      [java.net.NoRouteToHostException: No route to host]
>
> I don't see any obvious out of memory errors on the TaskManager UI.
>
> Adding nodes to the cluster does not seem to increase the maximum
> savable state size.
>
> I could enable HA, but for the time being I have been leaving it out to
> avoid the possibility of masking deterministic faults.
>
> Below are my configurations.
>
> Thanks in advance for any advice.
>
> Regards,
>
>
> Jeff Henrikson
>
>
>
> Flink version: 1.10
>
> Configuration set via code:
>      parallelism=8
>      maxParallelism=64
>      setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>      setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>      setTolerableCheckpointFailureNumber(1000)
>      setMaxConcurrentCheckpoints(1)
>
>
> enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>      RocksDBStateBackend
>      setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>      setNumberOfTransferThreads(25)
>      setDbStoragePath points to a local nvme SSD
>
> Configuration in flink-conf.yaml:
>
>      jobmanager.rpc.address: localhost
>      jobmanager.rpc.port: 6123
>      jobmanager.heap.size: 28000m
>      taskmanager.memory.process.size: 28000m
>      taskmanager.memory.jvm-metaspace.size: 512m
>      taskmanager.numberOfTaskSlots: 1
>      parallelism.default: 1
>      jobmanager.execution.failover-strategy: full
>
>      cluster.evenly-spread-out-slots: false
>
>      taskmanager.memory.network.fraction: 0.2           # default 0.1
>      taskmanager.memory.framework.off-heap.size: 2GB
>      taskmanager.memory.task.off-heap.size: 2GB
>      taskmanager.network.memory.buffers-per-channel: 32 # default 2
>      taskmanager.memory.managed.fraction: 0.4           # docs say
> default 0.1, but something seems to set 0.4
>      taskmanager.memory.task.off-heap.size: 2048MB      # default 128M
>
>      state.backend.fs.memory-threshold: 1048576
>      state.backend.fs.write-buffer-size: 10240000
>      state.backend.local-recovery: true
>      state.backend.rocksdb.writebuffer.size: 64MB
>      state.backend.rocksdb.writebuffer.count: 8
>      state.backend.rocksdb.writebuffer.number-to-merge: 4
>      state.backend.rocksdb.timer-service.factory: heap
>      state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
>      state.backend.rocksdb.write-batch-size: 16000000 # default 2MB
>
>      web.checkpoints.history: 250
>

Re: Trouble with large state

Posted by Yun Tang <my...@live.com>.
Hi Jeff


  1.  " after around 50GB of state, I stop being able to reliably take checkpoints or savepoints. "
What is the exact reason that job cannot complete checkpoint? Expired before completing or decline by some tasks? The former one is manly caused by high back-pressure and the later one is mainly due to some internal error.
  2.  Have you checked what reason the remote task manager is lost?
If the remote task manager is not crashed, it might be due to GC impact, I think you might need to check task-manager logs and GC logs.

Best
Yun Tang
________________________________
From: Jeff Henrikson <je...@gmail.com>
Sent: Thursday, June 18, 2020 1:46
To: user <us...@flink.apache.org>
Subject: Trouble with large state

Hello Flink users,

I have an application of around 10 enrichment joins.  All events are
read from kafka and have event timestamps.  The joins are built using
.cogroup, with a global window, triggering on every 1 event, plus a
custom evictor that drops records once a newer record for the same ID
has been processed.  Deletes are represented by empty events with
timestamp and ID (tombstones). That way, we can drop records when
business logic dictates, as opposed to when a maximum retention has been
attained.  The application runs RocksDBStateBackend, on Kubernetes on
AWS with local SSDs.

Unit tests show that the joins produce expected results.  On an 8 node
cluster, watermark output progress seems to indicate I should be able to
bootstrap my state of around 500GB in around 1 day.  I am able to save
and restore savepoints for the first half an hour of run time.

My current trouble is that after around 50GB of state, I stop being able
to reliably take checkpoints or savepoints.  Some time after that, I
start getting a variety of failures where the first suspicious log event
is a generic cluster connectivity error, such as:

     1) java.io.IOException: Connecting the channel failed: Connecting
     to remote task manager + '/10.67.7.101:38955' has failed. This
     might indicate that the remote task manager has been lost.

     2) org.apache.flink.runtime.io.network.netty.exception
     .RemoteTransportException: Connection unexpectedly closed by remote
     task manager 'null'. This might indicate that the remote task
     manager was lost.

     3) Association with remote system
     [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
     gated for [50] ms. Reason: [Association failed with
     [akka.tcp://flink@10.67.6.66:34987]] Caused by:
     [java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum
savable state size.

I could enable HA, but for the time being I have been leaving it out to
avoid the possibility of masking deterministic faults.

Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
     parallelism=8
     maxParallelism=64
     setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
     setTolerableCheckpointFailureNumber(1000)
     setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
     RocksDBStateBackend
     setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
     setNumberOfTransferThreads(25)
     setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

     jobmanager.rpc.address: localhost
     jobmanager.rpc.port: 6123
     jobmanager.heap.size: 28000m
     taskmanager.memory.process.size: 28000m
     taskmanager.memory.jvm-metaspace.size: 512m
     taskmanager.numberOfTaskSlots: 1
     parallelism.default: 1
     jobmanager.execution.failover-strategy: full

     cluster.evenly-spread-out-slots: false

     taskmanager.memory.network.fraction: 0.2           # default 0.1
     taskmanager.memory.framework.off-heap.size: 2GB
     taskmanager.memory.task.off-heap.size: 2GB
     taskmanager.network.memory.buffers-per-channel: 32 # default 2
     taskmanager.memory.managed.fraction: 0.4           # docs say
default 0.1, but something seems to set 0.4
     taskmanager.memory.task.off-heap.size: 2048MB      # default 128M

     state.backend.fs.memory-threshold: 1048576
     state.backend.fs.write-buffer-size: 10240000
     state.backend.local-recovery: true
     state.backend.rocksdb.writebuffer.size: 64MB
     state.backend.rocksdb.writebuffer.count: 8
     state.backend.rocksdb.writebuffer.number-to-merge: 4
     state.backend.rocksdb.timer-service.factory: heap
     state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
     state.backend.rocksdb.write-batch-size: 16000000 # default 2MB

     web.checkpoints.history: 250