You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexis Sarda-Espinosa <al...@microfocus.com> on 2022/04/08 14:16:34 UTC

RocksDB's state size discrepancy with what's seen with state processor API

Hello,

I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I've been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job's setup:


  *   Windows are 11 minutes in size.
  *   Slide time is 1 minute.
  *   Throughput is approximately 20 events per minute.

I have 3 operators with these states:


  1.  Window state with ListState<Integer> and no TTL.
  2.  Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
  3.  Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.

Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().

I have now analyzed the checkpoint folder with the state processor API, and I'll note here that I see 50 folders named chk-*** even though I don't set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:


  1.  10 entries
  2.  80 entries
  3.  200 entries

I got those numbers with something like this:

savepoint
        .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
        .process(...)
        .collect()
        .parallelStream()
        .reduce(0, Integer::sum);

Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.

Those amounts cannot possibly account for 614MB, so what am I missing?

Regards,
Alexis.


Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Yun Tang <my...@live.com>.
Hi Alexis,

Sorry for the late response. I come from the reply in FLINK-27504[1].
The MAINFEST file in RocksDB records history of version changes.

In other words, once a new SST file created or an old file deleted via compaction, it will create a new version in RocksDB, which will update the MAINFEST file.
The default value for max MAINFEST file size is 1GB [2], since you create the checkpoint every 30 seconds, files might be flushed on that time, and that's why the MAINFEST file grows.

You can limit the max file size via DBOptions#setMaxManifestFileSize [3].


[1] https://issues.apache.org/jira/browse/FLINK-27504?focusedCommentId=17537788&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17537788
[2] https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/include/rocksdb/options.h#L636
[3] https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/java/src/main/java/org/rocksdb/DBOptions.java#L520

Best
Yun Tang
________________________________
From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Tuesday, May 3, 2022 8:47
To: Peter Brucia <pe...@ververica.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: RE: RocksDB's state size discrepancy with what's seen with state processor API


Ok



Regards,

Alexis.



From: Peter Brucia <pe...@ververica.com>
Sent: Freitag, 22. April 2022 15:31
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API



No

Sent from my iPhone



RE: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Ok

Regards,
Alexis.

From: Peter Brucia <pe...@ververica.com>
Sent: Freitag, 22. April 2022 15:31
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

No
Sent from my iPhone


Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Hi David,

I don't find it troublesome per se, I was rather trying to understand what should be expected (and documented) for my application. Before I restarted the job and changed some configurations, it ran for around 10 days and ended up with a state size of about 1.8GB, so I'm still not sure what is the upper bound in my scenario, or if that amount of "uncompacted garbage" is normal or not (for our throughput). This is important for us because we need to know how to size (disk space) the infrastructure, although it is also having a big impact on timings because each checkpoint ends up requiring 30+ seconds to complete, and they will eventually time out for sure.

I understand RocksDB has different sophisticated mechanisms, so I certainly don't expect one magic button that does exactly what I want, but ideally there would be a way to tune configuration in a way that a rough upper bound estimate of disk space can be deduced. Having some expired state for a while is expected, what I find odd is that it grows so fast, the size of the state quickly outpaces the size of processed events, even though the state only persists a subset of information (some integer ids, string ids, longs for epochs).

At this point I think I can conclude that the "live" state from my operators is not growing indefinitely (based on what I see with the state processor API), so is there a way to get a better estimate of disk utilization other than letting the job run and wait? I've been reading through RocksDB documentation as well, but that might not be enough because I don't know how Flink handles its own framework state internally.

Regards,
Alexis.

________________________________
From: David Anderson <da...@apache.org>
Sent: Friday, April 22, 2022 9:57 AM
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: roman@apache.org <ro...@apache.org>; user@flink.apache.org <us...@flink.apache.org>
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

Alexis,

Compaction isn't an all-at-once procedure. RocksDB is organized as a series of levels, each 10x larger than the one below. There are a few different compaction algorithms available, and they are tunable, but what's typically happening during compaction is that one SST file at level n is being merged into the relevant SST files at level n+1. During this compaction procedure, obsolete and deleted entries are cleaned up. And several such compactions can be occurring concurrently. (Not to mention that each TM has its own independent RocksDB instance.)

It's not unusual for jobs with a small amount of state to end up with checkpoints of a few hundred MBs in size, where a lot of that is uncompacted garbage. If you find this troublesome, you could configure RocksDB to compact more frequently.

David

On Thu, Apr 21, 2022 at 12:49 PM Alexis Sarda-Espinosa <al...@microfocus.com>> wrote:
Hello,

I enabled some of the RocksDB metrics and I noticed some additional things. After changing the configuration YAML, I restarted the cluster with a savepoint, and I can see that it only used 5.6MB on disk. Consequently, after the job switched to running state, the new checkpoints were also a few MB in size. After running for 1 day, checkpoint size is now around 100MB. From the metrics I can see with the Prometheus reporter:

- All entries for num-live-versions show 1
- All entries for compaction-pending show 0
- Most entries for estimate-num-keys are in the range of 0 to 100, although I see a few with 151 coming from flink_taskmanager_job_task_operator__timer_state_event_window_timers_rocksdb_estimate_num_keys

Is compaction expected after only 100MB? I imagine not, but if the savepoint shows that the effective amount of data is so low, size growth still seems far too large. In fact, if I only look at the UI, Bytes Received for the relevant SubTasks is about 14MB, yet the latest checkpoint already shows a Data Size of 75MB for said SubTasks.

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org>>
Sent: Mittwoch, 20. April 2022 10:37
To: Alexis Sarda-Espinosa <al...@microfocus.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

State Processor API works on a higher level and is not aware of any RocksDB specifics (in fact, it can be used with any backend).

Regards,
Roman

On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa

<al...@microfocus.com>> wrote:
>
> I can look into RocksDB metrics, I need to configure Prometheus at some point anyway. However, going back to the original question, is there no way to gain more insight into this with the state processor API? You've mentioned potential issues (too many states, missing compaction) but, with my admittedly limited understanding of the way RocksDB is used in Flink, I would have thought that such things would be visible when using the state processor. Is there no way for me to "parse" those MANIFEST files with some of Flink's classes and get some more hints?
>
> Regards,
> Alexis.
>
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>>
> Sent: Tuesday, April 19, 2022 5:51 PM
> To: Alexis Sarda-Espinosa <al...@microfocus.com>>
> Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
> Subject: Re: RocksDB's state size discrepancy with what's seen with
> state processor API
>
> > I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
> Yes, that's what I meant, and that's regarded as the same column family.
>
> Another possible reason is that SST files aren't being compacted and
> that increases the MANIFEST file size.
> I'd check the total number of loaded SST files and the creation date
> of the oldest one.
>
> You can also see whether there are any compactions running via RocksDB
> metrics [1] [2] (a reporter needs to be configured [3]).
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-num-running-compactions
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-compaction-pending
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/m
> etric_reporters/#reporters
>
> Regards,
> Roman
>
> On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
> <al...@microfocus.com>> wrote:
> >
> > Hi Roman,
> >
> > I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
> >
> > For the 3 descriptors I mentioned before, they are only instantiated once and used like this:
> >
> > - Window list state: each call to process() executes
> > context.windowState().getListState(...).get()
> > - Global map state: each call to process() executes
> > context.globalState().getMapState(...)
> > - Global list state: within open(), runtimeContext.getListState(...) is executed once and used throughout the life of the operator.
> >
> > According to [1], the two ways of using global state should be equivalent.
> >
> > I will say that some of the operators instantiate the state descriptor in their constructors, i.e. before they are serialized to the TM, but the descriptors are Serializable, so I imagine that's not relevant.
> >
> > [1] https://stackoverflow.com/a/50510054/5793905
> >
> > Regards,
> > Alexis.
> >
> > -----Original Message-----
> > From: Roman Khachatryan <ro...@apache.org>>
> > Sent: Dienstag, 19. April 2022 11:48
> > To: Alexis Sarda-Espinosa <al...@microfocus.com>>
> > Cc: user@flink.apache.org<ma...@flink.apache.org>
> > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > state processor API
> >
> > Hi Alexis,
> >
> > Thanks a lot for the information,
> >
> > MANIFEST files list RocksDB column families (among other info); ever growing size of these files might indicate that some new states are constantly being created.
> > Could you please confirm that the number of state names is constant?
> >
> > > Could you confirm if Flink's own operators could be creating state in RocksDB? I assume the window operators save some information in the state as well.
> > That's correct, window operators maintain a list of elements per window and a set of timers (timestamps). These states' names should be fixed (something like "window-contents" and "window-timers").
> >
> > > is that related to managed state used by my functions? Or does that indicate size growth is elsewhere?
> > The same mechanism is used for both Flink internal state and operator state, so it's hard to say without at least knowing the state names.
> >
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org>> wrote:
> > >
> > > /shared folder contains keyed state that is shared among different
> > > checkpoints [1]. Most of state should be shared in your case since
> > > you're using keyed state and incremental checkpoints.
> > >
> > > When a checkpoint is loaded, the state that it shares with older
> > > checkpoints is loaded as well. I suggested to load different
> > > checkpoints (i.e. chk-* folders) and compare the numbers of
> > > objects in their states. To prevent the job from discarding the
> > > state, it can either be stopped for some time and then restarted
> > > from the latest checkpoint; or the number of retained checkpoints
> > > can be increased [2]. Copying isn't necessary.
> > >
> > > Besides that, you can also check state sizes of operator in Flink
> > > Web UI (but not the sizes of individual states). If the operators
> > > are chained then their combined state size will be shown. To
> > > prevent this, you can disable chaining [3] (although this will
> > > have performance impact).
> > >
> > > Individual checkpoint folders should be eventually removed (when
> > > the checkpoint is subsumed). However, this is not guaranteed: if
> > > there is any problem during deletion, it will be logged, but the
> > > job will not fail.
> > >
> > > [1]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/stat
> > > e/ch
> > > eckpoints/#directory-structure
> > > [2]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployme
> > > nt/c onfig/#state-checkpoints-num-retained
> > > [3]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/data
> > > stre am/operators/overview/#disable-chaining
> > >
> > > Regards,
> > > Roman
> > >
> > > On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa
> > > <al...@microfocus.com>> wrote:
> > > >
> > > > Hi Roman,
> > > >
> > > > Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
> > > >
> > > > I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
> > > >
> > > > Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
> > > >
> > > > Regards,
> > > > Alexis.
> > > >
> > > > -----Original Message-----
> > > > From: Roman Khachatryan <ro...@apache.org>>
> > > > Sent: Dienstag, 12. April 2022 12:37
> > > > To: Alexis Sarda-Espinosa <al...@microfocus.com>>
> > > > Cc: user@flink.apache.org<ma...@flink.apache.org>
> > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > with state processor API
> > > >
> > > > Hi Alexis,
> > > >
> > > > Thanks a lot for sharing this. I think the program is correct.
> > > > Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> > > > But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
> > > >
> > > > If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
> > > >
> > > > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com>> wrote:
> > > > >
> > > > > Some additional information that I’ve gathered:
> > > > >
> > > > >
> > > > >
> > > > > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > > > > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > > > > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > > > > GitHub gist with the whole processor setup since it’s not too long:
> > > > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db
> > > > > 678
> > > > >
> > > > >
> > > > >
> > > > > Relevant configuration entries (explicitly set, others are left with defaults):
> > > > >
> > > > >
> > > > >
> > > > > state.backend: rocksdb
> > > > >
> > > > > state.backend.incremental: true
> > > > >
> > > > > execution.checkpointing.interval: 30 s
> > > > >
> > > > > execution.checkpointing.min-pause: 25 s
> > > > >
> > > > > execution.checkpointing.timeout: 5 min
> > > > >
> > > > > execution.savepoint-restore-mode: CLAIM
> > > > >
> > > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > > RETAIN_ON_CANCELLATION
> > > > >
> > > > >
> > > > >
> > > > > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >
> > > > >
> > > > > From: Alexis Sarda-Espinosa
> > > > > <al...@microfocus.com>>
> > > > > Sent: Samstag, 9. April 2022 01:39
> > > > > To: roman@apache.org<ma...@apache.org>
> > > > > Cc: user@flink.apache.org<ma...@flink.apache.org>
> > > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > > with state processor API
> > > > >
> > > > >
> > > > >
> > > > > Hi Roman,
> > > > >
> > > > >
> > > > >
> > > > > Here's an example of a WindowReaderFunction:
> > > > >
> > > > >
> > > > >
> > > > >     public class StateReaderFunction extends
> > > > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> > > > >
> > > > >         private static final ListStateDescriptor<Integer> LSD
> > > > > = new ListStateDescriptor<>(
> > > > >
> > > > >                 "descriptorId",
> > > > >
> > > > >                 Integer.class
> > > > >
> > > > >         );
> > > > >
> > > > >
> > > > >
> > > > >         @Override
> > > > >
> > > > >         public void readWindow(String s, Context<TimeWindow>
> > > > > context, Iterable<Pojo> elements, Collector<Integer> out)
> > > > > throws Exception {
> > > > >
> > > > >             int count = 0;
> > > > >
> > > > >             for (Integer i :
> > > > > context.windowState().getListState(LSD).get()) {
> > > > >
> > > > >                 count++;
> > > > >
> > > > >             }
> > > > >
> > > > >             out.collect(count);
> > > > >
> > > > >         }
> > > > >
> > > > >     }
> > > > >
> > > > >
> > > > >
> > > > > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> > > > >
> > > > >
> > > > >
> > > > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> > > > >
> > > > >
> > > > >
> > > > > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >
> > > > >
> > > > > ________________________________
> > > > >
> > > > > From: Roman Khachatryan <ro...@apache.org>>
> > > > > Sent: Friday, April 8, 2022 11:06 PM
> > > > > To: Alexis Sarda-Espinosa
> > > > > <al...@microfocus.com>>
> > > > > Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
> > > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > > with state processor API
> > > > >
> > > > >
> > > > >
> > > > > Hi Alexis,
> > > > >
> > > > > If I understand correctly, the provided StateProcessor program
> > > > > gives you the number of stream elements per operator. However,
> > > > > you mentioned that these operators have collection-type states
> > > > > (ListState and MapState). That means that per one entry there
> > > > > can be an arbitrary number of state elements.
> > > > >
> > > > > Have you tried estimating the state sizes directly via readKeyedState[1]?
> > > > >
> > > > > > The other operator does override and call clear()
> > > > > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> > > > >
> > > > > [1]
> > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api
> > > > > /jav
> > > > > a/or
> > > > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState
> > > > > -jav
> > > > > a.la<http://a.la>
> > > > > ng.String-org.apache.flink.state.api.functions.KeyedStateReade
> > > > > rFun
> > > > > ctio
> > > > > n-
> > > > >
> > > > > [2]
> > > > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/
> > > > > java
> > > > > /org
> > > > > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > > > > html#clear-org.apache.flink.streaming.api.functions.windowing.
> > > > > Proc
> > > > > essW
> > > > > indowFunction.Context-
> > > > >
> > > > > Regards,
> > > > > Roman
> > > > >
> > > > >
> > > > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
> > > > > <al...@microfocus.com>> wrote:
> > > > > >
> > > > > > Hello,
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > > > > >
> > > > > >
> > > > > >
> > > > > > Windows are 11 minutes in size.
> > > > > > Slide time is 1 minute.
> > > > > > Throughput is approximately 20 events per minute.
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have 3 operators with these states:
> > > > > >
> > > > > >
> > > > > >
> > > > > > Window state with ListState<Integer> and no TTL.
> > > > > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > > > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > > > > >
> > > > > >
> > > > > >
> > > > > > 10 entries
> > > > > > 80 entries
> > > > > > 200 entries
> > > > > >
> > > > > >
> > > > > >
> > > > > > I got those numbers with something like this:
> > > > > >
> > > > > >
> > > > > >
> > > > > > savepoint
> > > > > >
> > > > > >
> > > > > > .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > > > > Time.minutes(1L)))
> > > > > >
> > > > > >         .process(...)
> > > > > >
> > > > > >         .collect()
> > > > > >
> > > > > >         .parallelStream()
> > > > > >
> > > > > >         .reduce(0, Integer::sum);
> > > > > >
> > > > > >
> > > > > >
> > > > > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Alexis.
> > > > > >
> > > > > >

Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by David Anderson <da...@apache.org>.
Alexis,

Compaction isn't an all-at-once procedure. RocksDB is organized as a series
of levels, each 10x larger than the one below. There are a few different
compaction algorithms available, and they are tunable, but what's typically
happening during compaction is that one SST file at level n is being merged
into the relevant SST files at level n+1. During this compaction procedure,
obsolete and deleted entries are cleaned up. And several such compactions
can be occurring concurrently. (Not to mention that each TM has its own
independent RocksDB instance.)

It's not unusual for jobs with a small amount of state to end up with
checkpoints of a few hundred MBs in size, where a lot of that is
uncompacted garbage. If you find this troublesome, you could configure
RocksDB to compact more frequently.

David

On Thu, Apr 21, 2022 at 12:49 PM Alexis Sarda-Espinosa <
alexis.sarda-espinosa@microfocus.com> wrote:

> Hello,
>
> I enabled some of the RocksDB metrics and I noticed some additional
> things. After changing the configuration YAML, I restarted the cluster with
> a savepoint, and I can see that it only used 5.6MB on disk. Consequently,
> after the job switched to running state, the new checkpoints were also a
> few MB in size. After running for 1 day, checkpoint size is now around
> 100MB. From the metrics I can see with the Prometheus reporter:
>
> - All entries for num-live-versions show 1
> - All entries for compaction-pending show 0
> - Most entries for estimate-num-keys are in the range of 0 to 100,
> although I see a few with 151 coming from
> flink_taskmanager_job_task_operator__timer_state_event_window_timers_rocksdb_estimate_num_keys
>
> Is compaction expected after only 100MB? I imagine not, but if the
> savepoint shows that the effective amount of data is so low, size growth
> still seems far too large. In fact, if I only look at the UI, Bytes
> Received for the relevant SubTasks is about 14MB, yet the latest checkpoint
> already shows a Data Size of 75MB for said SubTasks.
>
> Regards,
> Alexis.
>
> -----Original Message-----
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Mittwoch, 20. April 2022 10:37
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state
> processor API
>
> State Processor API works on a higher level and is not aware of any
> RocksDB specifics (in fact, it can be used with any backend).
>
> Regards,
> Roman
>
> On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa
>
> <al...@microfocus.com> wrote:
> >
> > I can look into RocksDB metrics, I need to configure Prometheus at some
> point anyway. However, going back to the original question, is there no way
> to gain more insight into this with the state processor API? You've
> mentioned potential issues (too many states, missing compaction) but, with
> my admittedly limited understanding of the way RocksDB is used in Flink, I
> would have thought that such things would be visible when using the state
> processor. Is there no way for me to "parse" those MANIFEST files with some
> of Flink's classes and get some more hints?
> >
> > Regards,
> > Alexis.
> >
> > ________________________________
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Tuesday, April 19, 2022 5:51 PM
> > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Cc: user@flink.apache.org <us...@flink.apache.org>
> > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > state processor API
> >
> > > I assume that when you say "new states", that is related to new
> descriptors with different names? Because, in the case of windowing for
> example, each window "instance" has its own scoped (non-global and keyed)
> state, but that's not regarded as a separate column family, is it?
> > Yes, that's what I meant, and that's regarded as the same column family.
> >
> > Another possible reason is that SST files aren't being compacted and
> > that increases the MANIFEST file size.
> > I'd check the total number of loaded SST files and the creation date
> > of the oldest one.
> >
> > You can also see whether there are any compactions running via RocksDB
> > metrics [1] [2] (a reporter needs to be configured [3]).
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> > onfig/#state-backend-rocksdb-metrics-num-running-compactions
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> > onfig/#state-backend-rocksdb-metrics-compaction-pending
> > [3]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/m
> > etric_reporters/#reporters
> >
> > Regards,
> > Roman
> >
> > On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
> > <al...@microfocus.com> wrote:
> > >
> > > Hi Roman,
> > >
> > > I assume that when you say "new states", that is related to new
> descriptors with different names? Because, in the case of windowing for
> example, each window "instance" has its own scoped (non-global and keyed)
> state, but that's not regarded as a separate column family, is it?
> > >
> > > For the 3 descriptors I mentioned before, they are only instantiated
> once and used like this:
> > >
> > > - Window list state: each call to process() executes
> > > context.windowState().getListState(...).get()
> > > - Global map state: each call to process() executes
> > > context.globalState().getMapState(...)
> > > - Global list state: within open(), runtimeContext.getListState(...)
> is executed once and used throughout the life of the operator.
> > >
> > > According to [1], the two ways of using global state should be
> equivalent.
> > >
> > > I will say that some of the operators instantiate the state descriptor
> in their constructors, i.e. before they are serialized to the TM, but the
> descriptors are Serializable, so I imagine that's not relevant.
> > >
> > > [1] https://stackoverflow.com/a/50510054/5793905
> > >
> > > Regards,
> > > Alexis.
> > >
> > > -----Original Message-----
> > > From: Roman Khachatryan <ro...@apache.org>
> > > Sent: Dienstag, 19. April 2022 11:48
> > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > Cc: user@flink.apache.org
> > > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > > state processor API
> > >
> > > Hi Alexis,
> > >
> > > Thanks a lot for the information,
> > >
> > > MANIFEST files list RocksDB column families (among other info); ever
> growing size of these files might indicate that some new states are
> constantly being created.
> > > Could you please confirm that the number of state names is constant?
> > >
> > > > Could you confirm if Flink's own operators could be creating state
> in RocksDB? I assume the window operators save some information in the
> state as well.
> > > That's correct, window operators maintain a list of elements per
> window and a set of timers (timestamps). These states' names should be
> fixed (something like "window-contents" and "window-timers").
> > >
> > > > is that related to managed state used by my functions? Or does that
> indicate size growth is elsewhere?
> > > The same mechanism is used for both Flink internal state and operator
> state, so it's hard to say without at least knowing the state names.
> > >
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org>
> wrote:
> > > >
> > > > /shared folder contains keyed state that is shared among different
> > > > checkpoints [1]. Most of state should be shared in your case since
> > > > you're using keyed state and incremental checkpoints.
> > > >
> > > > When a checkpoint is loaded, the state that it shares with older
> > > > checkpoints is loaded as well. I suggested to load different
> > > > checkpoints (i.e. chk-* folders) and compare the numbers of
> > > > objects in their states. To prevent the job from discarding the
> > > > state, it can either be stopped for some time and then restarted
> > > > from the latest checkpoint; or the number of retained checkpoints
> > > > can be increased [2]. Copying isn't necessary.
> > > >
> > > > Besides that, you can also check state sizes of operator in Flink
> > > > Web UI (but not the sizes of individual states). If the operators
> > > > are chained then their combined state size will be shown. To
> > > > prevent this, you can disable chaining [3] (although this will
> > > > have performance impact).
> > > >
> > > > Individual checkpoint folders should be eventually removed (when
> > > > the checkpoint is subsumed). However, this is not guaranteed: if
> > > > there is any problem during deletion, it will be logged, but the
> > > > job will not fail.
> > > >
> > > > [1]
> > > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/stat
> > > > e/ch
> > > > eckpoints/#directory-structure
> > > > [2]
> > > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployme
> > > > nt/c onfig/#state-checkpoints-num-retained
> > > > [3]
> > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/data
> > > > stre am/operators/overview/#disable-chaining
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > > On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa
> > > > <al...@microfocus.com> wrote:
> > > > >
> > > > > Hi Roman,
> > > > >
> > > > > Maybe I'm misunderstanding the structure of the data within the
> checkpoint. You suggest comparing counts of objects in different
> checkpoints, I assume you mean copying my "checkpoints" folder at different
> times and comparing, not comparing different "chk-*" folders in the same
> snapshot, right?
> > > > >
> > > > > I haven't executed the processor program with a newer checkpoint,
> but I did look at the folder in the running system, and I noticed that most
> of the chk-* folders have remained unchanged, there's only 1 or 2 new
> folders corresponding to newer checkpoints. I would think this makes sense
> since the configuration specifies that only 1 completed checkpoint should
> be retained, but then why are the older chk-* folders still there? I did
> trigger a manual restart of the Flink cluster in the past (before starting
> the long-running test), but if my policy is to CLAIM the checkpoint,
> Flink's documentation states that it would be cleaned eventually.
> > > > >
> > > > > Moreover, just by looking at folder sizes with "du", I can see
> that most of the state is held in the "shared" folder, and that has grown
> for sure; I'm not sure what "shared" usually holds, but if that's what's
> growing, maybe I can rule out expired state staying around?. My pipeline
> doesn't use timers, although I guess Flink itself may use them. Is there
> any way I could get some insight into which operator holds larger states?
> > > > >
> > > > > Regards,
> > > > > Alexis.
> > > > >
> > > > > -----Original Message-----
> > > > > From: Roman Khachatryan <ro...@apache.org>
> > > > > Sent: Dienstag, 12. April 2022 12:37
> > > > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > > > Cc: user@flink.apache.org
> > > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > > with state processor API
> > > > >
> > > > > Hi Alexis,
> > > > >
> > > > > Thanks a lot for sharing this. I think the program is correct.
> > > > > Although it doesn't take timers into account; and to estimate the
> state size more accurately, you could also use the same serializers used by
> the job.
> > > > > But maybe it makes more sense to compare the counts of objects in
> different checkpoints and see which state is growing.
> > > > >
> > > > > If the number of keys is small, compaction should eventually clean
> up the old values, given that the windows eventually expire. I think it
> makes sense to check that watermarks in all windows are making progress.
> > > > >
> > > > > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect
> the results of the State Processor program.
> > > > >
> > > > > Regards,
> > > > > Roman
> > > > >
> > > > > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <
> alexis.sarda-espinosa@microfocus.com> wrote:
> > > > > >
> > > > > > Some additional information that I’ve gathered:
> > > > > >
> > > > > >
> > > > > >
> > > > > > The number of unique keys in the system is 10, and that is
> correctly reflected in the state.
> > > > > > TTL for global window state is set to update on read and write,
> but the code has logic to remove old state based on event time.
> > > > > > Not sure it’s relevant, but the Flink cluster does run with
> jemalloc enabled.
> > > > > > GitHub gist with the whole processor setup since it’s not too
> long:
> > > > > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db
> > > > > > 678
> > > > > >
> > > > > >
> > > > > >
> > > > > > Relevant configuration entries (explicitly set, others are left
> with defaults):
> > > > > >
> > > > > >
> > > > > >
> > > > > > state.backend: rocksdb
> > > > > >
> > > > > > state.backend.incremental: true
> > > > > >
> > > > > > execution.checkpointing.interval: 30 s
> > > > > >
> > > > > > execution.checkpointing.min-pause: 25 s
> > > > > >
> > > > > > execution.checkpointing.timeout: 5 min
> > > > > >
> > > > > > execution.savepoint-restore-mode: CLAIM
> > > > > >
> > > > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > > > RETAIN_ON_CANCELLATION
> > > > > >
> > > > > >
> > > > > >
> > > > > > Over the weekend, state size has grown to 1.23GB with the
> operators referenced in the processor program taking 849MB, so I’m still
> pretty puzzled. I thought it could be due to expired state being retained,
> but I think that doesn’t make sense if I have finite keys, right?
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Alexis.
> > > > > >
> > > > > >
> > > > > >
> > > > > > From: Alexis Sarda-Espinosa
> > > > > > <al...@microfocus.com>
> > > > > > Sent: Samstag, 9. April 2022 01:39
> > > > > > To: roman@apache.org
> > > > > > Cc: user@flink.apache.org
> > > > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > > > with state processor API
> > > > > >
> > > > > >
> > > > > >
> > > > > > Hi Roman,
> > > > > >
> > > > > >
> > > > > >
> > > > > > Here's an example of a WindowReaderFunction:
> > > > > >
> > > > > >
> > > > > >
> > > > > >     public class StateReaderFunction extends
> > > > > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> > > > > >
> > > > > >         private static final ListStateDescriptor<Integer> LSD
> > > > > > = new ListStateDescriptor<>(
> > > > > >
> > > > > >                 "descriptorId",
> > > > > >
> > > > > >                 Integer.class
> > > > > >
> > > > > >         );
> > > > > >
> > > > > >
> > > > > >
> > > > > >         @Override
> > > > > >
> > > > > >         public void readWindow(String s, Context<TimeWindow>
> > > > > > context, Iterable<Pojo> elements, Collector<Integer> out)
> > > > > > throws Exception {
> > > > > >
> > > > > >             int count = 0;
> > > > > >
> > > > > >             for (Integer i :
> > > > > > context.windowState().getListState(LSD).get()) {
> > > > > >
> > > > > >                 count++;
> > > > > >
> > > > > >             }
> > > > > >
> > > > > >             out.collect(count);
> > > > > >
> > > > > >         }
> > > > > >
> > > > > >     }
> > > > > >
> > > > > >
> > > > > >
> > > > > > That's for the operator that uses window state. The other
> readers do something similar but with context.globalState(). That should
> provide the number of state entries for each key+window combination, no?
> And after collecting all results, I would get the number of state entries
> across all keys+windows for an operator.
> > > > > >
> > > > > >
> > > > > >
> > > > > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call
> context.windowState().getListState(...).clear().
> > > > > >
> > > > > >
> > > > > >
> > > > > > Side note: in the state processor program I call
> ExecutionEnvironment#setParallelism(1) even though my streaming job runs
> with parallelism=4, this doesn't affect the result, does it?
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Alexis.
> > > > > >
> > > > > >
> > > > > >
> > > > > > ________________________________
> > > > > >
> > > > > > From: Roman Khachatryan <ro...@apache.org>
> > > > > > Sent: Friday, April 8, 2022 11:06 PM
> > > > > > To: Alexis Sarda-Espinosa
> > > > > > <al...@microfocus.com>
> > > > > > Cc: user@flink.apache.org <us...@flink.apache.org>
> > > > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > > > with state processor API
> > > > > >
> > > > > >
> > > > > >
> > > > > > Hi Alexis,
> > > > > >
> > > > > > If I understand correctly, the provided StateProcessor program
> > > > > > gives you the number of stream elements per operator. However,
> > > > > > you mentioned that these operators have collection-type states
> > > > > > (ListState and MapState). That means that per one entry there
> > > > > > can be an arbitrary number of state elements.
> > > > > >
> > > > > > Have you tried estimating the state sizes directly via
> readKeyedState[1]?
> > > > > >
> > > > > > > The other operator does override and call clear()
> > > > > > Just to make sure, you mean ProcessWindowFunction.clear() [2],
> right?
> > > > > >
> > > > > > [1]
> > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api
> > > > > > /jav
> > > > > > a/or
> > > > > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState
> > > > > > -jav
> > > > > > a.la
> > > > > > ng.String-org.apache.flink.state.api.functions.KeyedStateReade
> > > > > > rFun
> > > > > > ctio
> > > > > > n-
> > > > > >
> > > > > > [2]
> > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/
> > > > > > java
> > > > > > /org
> > > > > >
> /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > > > > > html#clear-org.apache.flink.streaming.api.functions.windowing.
> > > > > > Proc
> > > > > > essW
> > > > > > indowFunction.Context-
> > > > > >
> > > > > > Regards,
> > > > > > Roman
> > > > > >
> > > > > >
> > > > > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
> > > > > > <al...@microfocus.com> wrote:
> > > > > > >
> > > > > > > Hello,
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I have a streaming job running on Flink 1.14.4 that uses
> managed state with RocksDB with incremental checkpoints as backend. I’ve
> been monitoring a dev environment that has been running for the last week
> and I noticed that state size and end-to-end duration have been increasing
> steadily. Currently, duration is 11 seconds and size is 917MB (as shown in
> the UI). The tasks with the largest state (614MB) come from keyed sliding
> windows. Some attributes of this job’s setup:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Windows are 11 minutes in size.
> > > > > > > Slide time is 1 minute.
> > > > > > > Throughput is approximately 20 events per minute.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I have 3 operators with these states:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Window state with ListState<Integer> and no TTL.
> > > > > > > Global window state with MapState<Long, List<String>> and a
> TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > > > > > Global window state with ListState<Pojo> where the Pojo has an
> int and a long, a TTL of 1 hour, and configured with
> cleanupInRocksdbCompactFilter(1000L) as well.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Both operators with global window state have logic to manually
> remove old state in addition to configured TTL. The other operator does
> override and call clear().
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I have now analyzed the checkpoint folder with the state
> processor API, and I’ll note here that I see 50 folders named chk-*** even
> though I don’t set state.checkpoints.num-retained and the default should be
> 1. I loaded the data from the folder with the highest chk number and I see
> that my operators have these amounts respectively:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > 10 entries
> > > > > > > 80 entries
> > > > > > > 200 entries
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I got those numbers with something like this:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > savepoint
> > > > > > >
> > > > > > >
> > > > > > > .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > > > > > Time.minutes(1L)))
> > > > > > >
> > > > > > >         .process(...)
> > > > > > >
> > > > > > >         .collect()
> > > > > > >
> > > > > > >         .parallelStream()
> > > > > > >
> > > > > > >         .reduce(0, Integer::sum);
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Where my WindowReaderFunction classes just count the number of
> entries in each call to readWindow.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Those amounts cannot possibly account for 614MB, so what am I
> missing?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Regards,
> > > > > > >
> > > > > > > Alexis.
> > > > > > >
> > > > > > >
>

RE: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Hello,

I enabled some of the RocksDB metrics and I noticed some additional things. After changing the configuration YAML, I restarted the cluster with a savepoint, and I can see that it only used 5.6MB on disk. Consequently, after the job switched to running state, the new checkpoints were also a few MB in size. After running for 1 day, checkpoint size is now around 100MB. From the metrics I can see with the Prometheus reporter:

- All entries for num-live-versions show 1
- All entries for compaction-pending show 0
- Most entries for estimate-num-keys are in the range of 0 to 100, although I see a few with 151 coming from flink_taskmanager_job_task_operator__timer_state_event_window_timers_rocksdb_estimate_num_keys

Is compaction expected after only 100MB? I imagine not, but if the savepoint shows that the effective amount of data is so low, size growth still seems far too large. In fact, if I only look at the UI, Bytes Received for the relevant SubTasks is about 14MB, yet the latest checkpoint already shows a Data Size of 75MB for said SubTasks.

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org> 
Sent: Mittwoch, 20. April 2022 10:37
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

State Processor API works on a higher level and is not aware of any RocksDB specifics (in fact, it can be used with any backend).

Regards,
Roman

On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa

<al...@microfocus.com> wrote:
>
> I can look into RocksDB metrics, I need to configure Prometheus at some point anyway. However, going back to the original question, is there no way to gain more insight into this with the state processor API? You've mentioned potential issues (too many states, missing compaction) but, with my admittedly limited understanding of the way RocksDB is used in Flink, I would have thought that such things would be visible when using the state processor. Is there no way for me to "parse" those MANIFEST files with some of Flink's classes and get some more hints?
>
> Regards,
> Alexis.
>
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Tuesday, April 19, 2022 5:51 PM
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org <us...@flink.apache.org>
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
> > I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
> Yes, that's what I meant, and that's regarded as the same column family.
>
> Another possible reason is that SST files aren't being compacted and 
> that increases the MANIFEST file size.
> I'd check the total number of loaded SST files and the creation date 
> of the oldest one.
>
> You can also see whether there are any compactions running via RocksDB 
> metrics [1] [2] (a reporter needs to be configured [3]).
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-num-running-compactions
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-compaction-pending
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/m
> etric_reporters/#reporters
>
> Regards,
> Roman
>
> On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa 
> <al...@microfocus.com> wrote:
> >
> > Hi Roman,
> >
> > I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
> >
> > For the 3 descriptors I mentioned before, they are only instantiated once and used like this:
> >
> > - Window list state: each call to process() executes 
> > context.windowState().getListState(...).get()
> > - Global map state: each call to process() executes 
> > context.globalState().getMapState(...)
> > - Global list state: within open(), runtimeContext.getListState(...) is executed once and used throughout the life of the operator.
> >
> > According to [1], the two ways of using global state should be equivalent.
> >
> > I will say that some of the operators instantiate the state descriptor in their constructors, i.e. before they are serialized to the TM, but the descriptors are Serializable, so I imagine that's not relevant.
> >
> > [1] https://stackoverflow.com/a/50510054/5793905
> >
> > Regards,
> > Alexis.
> >
> > -----Original Message-----
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Dienstag, 19. April 2022 11:48
> > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with 
> > state processor API
> >
> > Hi Alexis,
> >
> > Thanks a lot for the information,
> >
> > MANIFEST files list RocksDB column families (among other info); ever growing size of these files might indicate that some new states are constantly being created.
> > Could you please confirm that the number of state names is constant?
> >
> > > Could you confirm if Flink's own operators could be creating state in RocksDB? I assume the window operators save some information in the state as well.
> > That's correct, window operators maintain a list of elements per window and a set of timers (timestamps). These states' names should be fixed (something like "window-contents" and "window-timers").
> >
> > > is that related to managed state used by my functions? Or does that indicate size growth is elsewhere?
> > The same mechanism is used for both Flink internal state and operator state, so it's hard to say without at least knowing the state names.
> >
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org> wrote:
> > >
> > > /shared folder contains keyed state that is shared among different 
> > > checkpoints [1]. Most of state should be shared in your case since 
> > > you're using keyed state and incremental checkpoints.
> > >
> > > When a checkpoint is loaded, the state that it shares with older 
> > > checkpoints is loaded as well. I suggested to load different 
> > > checkpoints (i.e. chk-* folders) and compare the numbers of 
> > > objects in their states. To prevent the job from discarding the 
> > > state, it can either be stopped for some time and then restarted 
> > > from the latest checkpoint; or the number of retained checkpoints 
> > > can be increased [2]. Copying isn't necessary.
> > >
> > > Besides that, you can also check state sizes of operator in Flink 
> > > Web UI (but not the sizes of individual states). If the operators 
> > > are chained then their combined state size will be shown. To 
> > > prevent this, you can disable chaining [3] (although this will 
> > > have performance impact).
> > >
> > > Individual checkpoint folders should be eventually removed (when 
> > > the checkpoint is subsumed). However, this is not guaranteed: if 
> > > there is any problem during deletion, it will be logged, but the 
> > > job will not fail.
> > >
> > > [1]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/stat
> > > e/ch
> > > eckpoints/#directory-structure
> > > [2]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployme
> > > nt/c onfig/#state-checkpoints-num-retained
> > > [3]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/data
> > > stre am/operators/overview/#disable-chaining
> > >
> > > Regards,
> > > Roman
> > >
> > > On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
> > > <al...@microfocus.com> wrote:
> > > >
> > > > Hi Roman,
> > > >
> > > > Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
> > > >
> > > > I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
> > > >
> > > > Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
> > > >
> > > > Regards,
> > > > Alexis.
> > > >
> > > > -----Original Message-----
> > > > From: Roman Khachatryan <ro...@apache.org>
> > > > Sent: Dienstag, 12. April 2022 12:37
> > > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > > Cc: user@flink.apache.org
> > > > Subject: Re: RocksDB's state size discrepancy with what's seen 
> > > > with state processor API
> > > >
> > > > Hi Alexis,
> > > >
> > > > Thanks a lot for sharing this. I think the program is correct.
> > > > Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> > > > But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
> > > >
> > > > If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
> > > >
> > > > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
> > > > >
> > > > > Some additional information that I’ve gathered:
> > > > >
> > > > >
> > > > >
> > > > > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > > > > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > > > > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > > > > GitHub gist with the whole processor setup since it’s not too long:
> > > > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db
> > > > > 678
> > > > >
> > > > >
> > > > >
> > > > > Relevant configuration entries (explicitly set, others are left with defaults):
> > > > >
> > > > >
> > > > >
> > > > > state.backend: rocksdb
> > > > >
> > > > > state.backend.incremental: true
> > > > >
> > > > > execution.checkpointing.interval: 30 s
> > > > >
> > > > > execution.checkpointing.min-pause: 25 s
> > > > >
> > > > > execution.checkpointing.timeout: 5 min
> > > > >
> > > > > execution.savepoint-restore-mode: CLAIM
> > > > >
> > > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > > RETAIN_ON_CANCELLATION
> > > > >
> > > > >
> > > > >
> > > > > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >
> > > > >
> > > > > From: Alexis Sarda-Espinosa 
> > > > > <al...@microfocus.com>
> > > > > Sent: Samstag, 9. April 2022 01:39
> > > > > To: roman@apache.org
> > > > > Cc: user@flink.apache.org
> > > > > Subject: Re: RocksDB's state size discrepancy with what's seen 
> > > > > with state processor API
> > > > >
> > > > >
> > > > >
> > > > > Hi Roman,
> > > > >
> > > > >
> > > > >
> > > > > Here's an example of a WindowReaderFunction:
> > > > >
> > > > >
> > > > >
> > > > >     public class StateReaderFunction extends 
> > > > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> > > > >
> > > > >         private static final ListStateDescriptor<Integer> LSD 
> > > > > = new ListStateDescriptor<>(
> > > > >
> > > > >                 "descriptorId",
> > > > >
> > > > >                 Integer.class
> > > > >
> > > > >         );
> > > > >
> > > > >
> > > > >
> > > > >         @Override
> > > > >
> > > > >         public void readWindow(String s, Context<TimeWindow> 
> > > > > context, Iterable<Pojo> elements, Collector<Integer> out) 
> > > > > throws Exception {
> > > > >
> > > > >             int count = 0;
> > > > >
> > > > >             for (Integer i :
> > > > > context.windowState().getListState(LSD).get()) {
> > > > >
> > > > >                 count++;
> > > > >
> > > > >             }
> > > > >
> > > > >             out.collect(count);
> > > > >
> > > > >         }
> > > > >
> > > > >     }
> > > > >
> > > > >
> > > > >
> > > > > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> > > > >
> > > > >
> > > > >
> > > > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> > > > >
> > > > >
> > > > >
> > > > > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >
> > > > >
> > > > > ________________________________
> > > > >
> > > > > From: Roman Khachatryan <ro...@apache.org>
> > > > > Sent: Friday, April 8, 2022 11:06 PM
> > > > > To: Alexis Sarda-Espinosa 
> > > > > <al...@microfocus.com>
> > > > > Cc: user@flink.apache.org <us...@flink.apache.org>
> > > > > Subject: Re: RocksDB's state size discrepancy with what's seen 
> > > > > with state processor API
> > > > >
> > > > >
> > > > >
> > > > > Hi Alexis,
> > > > >
> > > > > If I understand correctly, the provided StateProcessor program 
> > > > > gives you the number of stream elements per operator. However, 
> > > > > you mentioned that these operators have collection-type states 
> > > > > (ListState and MapState). That means that per one entry there 
> > > > > can be an arbitrary number of state elements.
> > > > >
> > > > > Have you tried estimating the state sizes directly via readKeyedState[1]?
> > > > >
> > > > > > The other operator does override and call clear()
> > > > > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> > > > >
> > > > > [1]
> > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api
> > > > > /jav
> > > > > a/or
> > > > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState
> > > > > -jav
> > > > > a.la
> > > > > ng.String-org.apache.flink.state.api.functions.KeyedStateReade
> > > > > rFun
> > > > > ctio
> > > > > n-
> > > > >
> > > > > [2]
> > > > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/
> > > > > java
> > > > > /org
> > > > > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > > > > html#clear-org.apache.flink.streaming.api.functions.windowing.
> > > > > Proc
> > > > > essW
> > > > > indowFunction.Context-
> > > > >
> > > > > Regards,
> > > > > Roman
> > > > >
> > > > >
> > > > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa 
> > > > > <al...@microfocus.com> wrote:
> > > > > >
> > > > > > Hello,
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > > > > >
> > > > > >
> > > > > >
> > > > > > Windows are 11 minutes in size.
> > > > > > Slide time is 1 minute.
> > > > > > Throughput is approximately 20 events per minute.
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have 3 operators with these states:
> > > > > >
> > > > > >
> > > > > >
> > > > > > Window state with ListState<Integer> and no TTL.
> > > > > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > > > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > > > > >
> > > > > >
> > > > > >
> > > > > > 10 entries
> > > > > > 80 entries
> > > > > > 200 entries
> > > > > >
> > > > > >
> > > > > >
> > > > > > I got those numbers with something like this:
> > > > > >
> > > > > >
> > > > > >
> > > > > > savepoint
> > > > > >
> > > > > >         
> > > > > > .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > > > > Time.minutes(1L)))
> > > > > >
> > > > > >         .process(...)
> > > > > >
> > > > > >         .collect()
> > > > > >
> > > > > >         .parallelStream()
> > > > > >
> > > > > >         .reduce(0, Integer::sum);
> > > > > >
> > > > > >
> > > > > >
> > > > > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Alexis.
> > > > > >
> > > > > >

Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Roman Khachatryan <ro...@apache.org>.
State Processor API works on a higher level and is not aware of any
RocksDB specifics (in fact, it can be used with any backend).

Regards,
Roman

On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa

<al...@microfocus.com> wrote:
>
> I can look into RocksDB metrics, I need to configure Prometheus at some point anyway. However, going back to the original question, is there no way to gain more insight into this with the state processor API? You've mentioned potential issues (too many states, missing compaction) but, with my admittedly limited understanding of the way RocksDB is used in Flink, I would have thought that such things would be visible when using the state processor. Is there no way for me to "parse" those MANIFEST files with some of Flink's classes and get some more hints?
>
> Regards,
> Alexis.
>
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Tuesday, April 19, 2022 5:51 PM
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org <us...@flink.apache.org>
> Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API
>
> > I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
> Yes, that's what I meant, and that's regarded as the same column family.
>
> Another possible reason is that SST files aren't being compacted and
> that increases the MANIFEST file size.
> I'd check the total number of loaded SST files and the creation date
> of the oldest one.
>
> You can also see whether there are any compactions running via RocksDB
> metrics [1] [2] (a reporter needs to be configured [3]).
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-num-running-compactions
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-compaction-pending
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#reporters
>
> Regards,
> Roman
>
> On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
> <al...@microfocus.com> wrote:
> >
> > Hi Roman,
> >
> > I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
> >
> > For the 3 descriptors I mentioned before, they are only instantiated once and used like this:
> >
> > - Window list state: each call to process() executes context.windowState().getListState(...).get()
> > - Global map state: each call to process() executes context.globalState().getMapState(...)
> > - Global list state: within open(), runtimeContext.getListState(...) is executed once and used throughout the life of the operator.
> >
> > According to [1], the two ways of using global state should be equivalent.
> >
> > I will say that some of the operators instantiate the state descriptor in their constructors, i.e. before they are serialized to the TM, but the descriptors are Serializable, so I imagine that's not relevant.
> >
> > [1] https://stackoverflow.com/a/50510054/5793905
> >
> > Regards,
> > Alexis.
> >
> > -----Original Message-----
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Dienstag, 19. April 2022 11:48
> > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API
> >
> > Hi Alexis,
> >
> > Thanks a lot for the information,
> >
> > MANIFEST files list RocksDB column families (among other info); ever growing size of these files might indicate that some new states are constantly being created.
> > Could you please confirm that the number of state names is constant?
> >
> > > Could you confirm if Flink's own operators could be creating state in RocksDB? I assume the window operators save some information in the state as well.
> > That's correct, window operators maintain a list of elements per window and a set of timers (timestamps). These states' names should be fixed (something like "window-contents" and "window-timers").
> >
> > > is that related to managed state used by my functions? Or does that indicate size growth is elsewhere?
> > The same mechanism is used for both Flink internal state and operator state, so it's hard to say without at least knowing the state names.
> >
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org> wrote:
> > >
> > > /shared folder contains keyed state that is shared among different
> > > checkpoints [1]. Most of state should be shared in your case since
> > > you're using keyed state and incremental checkpoints.
> > >
> > > When a checkpoint is loaded, the state that it shares with older
> > > checkpoints is loaded as well. I suggested to load different
> > > checkpoints (i.e. chk-* folders) and compare the numbers of objects in
> > > their states. To prevent the job from discarding the state, it can
> > > either be stopped for some time and then restarted from the latest
> > > checkpoint; or the number of retained checkpoints can be increased
> > > [2]. Copying isn't necessary.
> > >
> > > Besides that, you can also check state sizes of operator in Flink Web
> > > UI (but not the sizes of individual states). If the operators are
> > > chained then their combined state size will be shown. To prevent this,
> > > you can disable chaining [3] (although this will have performance
> > > impact).
> > >
> > > Individual checkpoint folders should be eventually removed (when the
> > > checkpoint is subsumed). However, this is not guaranteed: if there is
> > > any problem during deletion, it will be logged, but the job will not
> > > fail.
> > >
> > > [1]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/ch
> > > eckpoints/#directory-structure
> > > [2]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> > > onfig/#state-checkpoints-num-retained
> > > [3]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastre
> > > am/operators/overview/#disable-chaining
> > >
> > > Regards,
> > > Roman
> > >
> > > On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa
> > > <al...@microfocus.com> wrote:
> > > >
> > > > Hi Roman,
> > > >
> > > > Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
> > > >
> > > > I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
> > > >
> > > > Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
> > > >
> > > > Regards,
> > > > Alexis.
> > > >
> > > > -----Original Message-----
> > > > From: Roman Khachatryan <ro...@apache.org>
> > > > Sent: Dienstag, 12. April 2022 12:37
> > > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > > Cc: user@flink.apache.org
> > > > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > > > state processor API
> > > >
> > > > Hi Alexis,
> > > >
> > > > Thanks a lot for sharing this. I think the program is correct.
> > > > Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> > > > But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
> > > >
> > > > If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
> > > >
> > > > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
> > > > >
> > > > > Some additional information that I’ve gathered:
> > > > >
> > > > >
> > > > >
> > > > > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > > > > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > > > > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > > > > GitHub gist with the whole processor setup since it’s not too long:
> > > > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> > > > >
> > > > >
> > > > >
> > > > > Relevant configuration entries (explicitly set, others are left with defaults):
> > > > >
> > > > >
> > > > >
> > > > > state.backend: rocksdb
> > > > >
> > > > > state.backend.incremental: true
> > > > >
> > > > > execution.checkpointing.interval: 30 s
> > > > >
> > > > > execution.checkpointing.min-pause: 25 s
> > > > >
> > > > > execution.checkpointing.timeout: 5 min
> > > > >
> > > > > execution.savepoint-restore-mode: CLAIM
> > > > >
> > > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > > RETAIN_ON_CANCELLATION
> > > > >
> > > > >
> > > > >
> > > > > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >
> > > > >
> > > > > From: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > > > Sent: Samstag, 9. April 2022 01:39
> > > > > To: roman@apache.org
> > > > > Cc: user@flink.apache.org
> > > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > > with state processor API
> > > > >
> > > > >
> > > > >
> > > > > Hi Roman,
> > > > >
> > > > >
> > > > >
> > > > > Here's an example of a WindowReaderFunction:
> > > > >
> > > > >
> > > > >
> > > > >     public class StateReaderFunction extends
> > > > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> > > > >
> > > > >         private static final ListStateDescriptor<Integer> LSD =
> > > > > new ListStateDescriptor<>(
> > > > >
> > > > >                 "descriptorId",
> > > > >
> > > > >                 Integer.class
> > > > >
> > > > >         );
> > > > >
> > > > >
> > > > >
> > > > >         @Override
> > > > >
> > > > >         public void readWindow(String s, Context<TimeWindow>
> > > > > context, Iterable<Pojo> elements, Collector<Integer> out) throws
> > > > > Exception {
> > > > >
> > > > >             int count = 0;
> > > > >
> > > > >             for (Integer i :
> > > > > context.windowState().getListState(LSD).get()) {
> > > > >
> > > > >                 count++;
> > > > >
> > > > >             }
> > > > >
> > > > >             out.collect(count);
> > > > >
> > > > >         }
> > > > >
> > > > >     }
> > > > >
> > > > >
> > > > >
> > > > > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> > > > >
> > > > >
> > > > >
> > > > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> > > > >
> > > > >
> > > > >
> > > > > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >
> > > > >
> > > > > ________________________________
> > > > >
> > > > > From: Roman Khachatryan <ro...@apache.org>
> > > > > Sent: Friday, April 8, 2022 11:06 PM
> > > > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > > > Cc: user@flink.apache.org <us...@flink.apache.org>
> > > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > > with state processor API
> > > > >
> > > > >
> > > > >
> > > > > Hi Alexis,
> > > > >
> > > > > If I understand correctly, the provided StateProcessor program
> > > > > gives you the number of stream elements per operator. However, you
> > > > > mentioned that these operators have collection-type states
> > > > > (ListState and MapState). That means that per one entry there can
> > > > > be an arbitrary number of state elements.
> > > > >
> > > > > Have you tried estimating the state sizes directly via readKeyedState[1]?
> > > > >
> > > > > > The other operator does override and call clear()
> > > > > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> > > > >
> > > > > [1]
> > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/jav
> > > > > a/or
> > > > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-jav
> > > > > a.la
> > > > > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFun
> > > > > ctio
> > > > > n-
> > > > >
> > > > > [2]
> > > > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java
> > > > > /org
> > > > > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > > > > html#clear-org.apache.flink.streaming.api.functions.windowing.Proc
> > > > > essW
> > > > > indowFunction.Context-
> > > > >
> > > > > Regards,
> > > > > Roman
> > > > >
> > > > >
> > > > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
> > > > > <al...@microfocus.com> wrote:
> > > > > >
> > > > > > Hello,
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > > > > >
> > > > > >
> > > > > >
> > > > > > Windows are 11 minutes in size.
> > > > > > Slide time is 1 minute.
> > > > > > Throughput is approximately 20 events per minute.
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have 3 operators with these states:
> > > > > >
> > > > > >
> > > > > >
> > > > > > Window state with ListState<Integer> and no TTL.
> > > > > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > > > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > > > > >
> > > > > >
> > > > > >
> > > > > > 10 entries
> > > > > > 80 entries
> > > > > > 200 entries
> > > > > >
> > > > > >
> > > > > >
> > > > > > I got those numbers with something like this:
> > > > > >
> > > > > >
> > > > > >
> > > > > > savepoint
> > > > > >
> > > > > >         .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > > > > Time.minutes(1L)))
> > > > > >
> > > > > >         .process(...)
> > > > > >
> > > > > >         .collect()
> > > > > >
> > > > > >         .parallelStream()
> > > > > >
> > > > > >         .reduce(0, Integer::sum);
> > > > > >
> > > > > >
> > > > > >
> > > > > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Alexis.
> > > > > >
> > > > > >

Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
I can look into RocksDB metrics, I need to configure Prometheus at some point anyway. However, going back to the original question, is there no way to gain more insight into this with the state processor API? You've mentioned potential issues (too many states, missing compaction) but, with my admittedly limited understanding of the way RocksDB is used in Flink, I would have thought that such things would be visible when using the state processor. Is there no way for me to "parse" those MANIFEST files with some of Flink's classes and get some more hints?

Regards,
Alexis.

________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Tuesday, April 19, 2022 5:51 PM
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

> I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
Yes, that's what I meant, and that's regarded as the same column family.

Another possible reason is that SST files aren't being compacted and
that increases the MANIFEST file size.
I'd check the total number of loaded SST files and the creation date
of the oldest one.

You can also see whether there are any compactions running via RocksDB
metrics [1] [2] (a reporter needs to be configured [3]).

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-num-running-compactions
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-compaction-pending
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#reporters

Regards,
Roman

On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
<al...@microfocus.com> wrote:
>
> Hi Roman,
>
> I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
>
> For the 3 descriptors I mentioned before, they are only instantiated once and used like this:
>
> - Window list state: each call to process() executes context.windowState().getListState(...).get()
> - Global map state: each call to process() executes context.globalState().getMapState(...)
> - Global list state: within open(), runtimeContext.getListState(...) is executed once and used throughout the life of the operator.
>
> According to [1], the two ways of using global state should be equivalent.
>
> I will say that some of the operators instantiate the state descriptor in their constructors, i.e. before they are serialized to the TM, but the descriptors are Serializable, so I imagine that's not relevant.
>
> [1] https://stackoverflow.com/a/50510054/5793905
>
> Regards,
> Alexis.
>
> -----Original Message-----
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Dienstag, 19. April 2022 11:48
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API
>
> Hi Alexis,
>
> Thanks a lot for the information,
>
> MANIFEST files list RocksDB column families (among other info); ever growing size of these files might indicate that some new states are constantly being created.
> Could you please confirm that the number of state names is constant?
>
> > Could you confirm if Flink's own operators could be creating state in RocksDB? I assume the window operators save some information in the state as well.
> That's correct, window operators maintain a list of elements per window and a set of timers (timestamps). These states' names should be fixed (something like "window-contents" and "window-timers").
>
> > is that related to managed state used by my functions? Or does that indicate size growth is elsewhere?
> The same mechanism is used for both Flink internal state and operator state, so it's hard to say without at least knowing the state names.
>
>
> Regards,
> Roman
>
>
> On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org> wrote:
> >
> > /shared folder contains keyed state that is shared among different
> > checkpoints [1]. Most of state should be shared in your case since
> > you're using keyed state and incremental checkpoints.
> >
> > When a checkpoint is loaded, the state that it shares with older
> > checkpoints is loaded as well. I suggested to load different
> > checkpoints (i.e. chk-* folders) and compare the numbers of objects in
> > their states. To prevent the job from discarding the state, it can
> > either be stopped for some time and then restarted from the latest
> > checkpoint; or the number of retained checkpoints can be increased
> > [2]. Copying isn't necessary.
> >
> > Besides that, you can also check state sizes of operator in Flink Web
> > UI (but not the sizes of individual states). If the operators are
> > chained then their combined state size will be shown. To prevent this,
> > you can disable chaining [3] (although this will have performance
> > impact).
> >
> > Individual checkpoint folders should be eventually removed (when the
> > checkpoint is subsumed). However, this is not guaranteed: if there is
> > any problem during deletion, it will be logged, but the job will not
> > fail.
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/ch
> > eckpoints/#directory-structure
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> > onfig/#state-checkpoints-num-retained
> > [3]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastre
> > am/operators/overview/#disable-chaining
> >
> > Regards,
> > Roman
> >
> > On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa
> > <al...@microfocus.com> wrote:
> > >
> > > Hi Roman,
> > >
> > > Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
> > >
> > > I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
> > >
> > > Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
> > >
> > > Regards,
> > > Alexis.
> > >
> > > -----Original Message-----
> > > From: Roman Khachatryan <ro...@apache.org>
> > > Sent: Dienstag, 12. April 2022 12:37
> > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > Cc: user@flink.apache.org
> > > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > > state processor API
> > >
> > > Hi Alexis,
> > >
> > > Thanks a lot for sharing this. I think the program is correct.
> > > Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> > > But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
> > >
> > > If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
> > >
> > > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
> > >
> > > Regards,
> > > Roman
> > >
> > > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
> > > >
> > > > Some additional information that I’ve gathered:
> > > >
> > > >
> > > >
> > > > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > > > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > > > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > > > GitHub gist with the whole processor setup since it’s not too long:
> > > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> > > >
> > > >
> > > >
> > > > Relevant configuration entries (explicitly set, others are left with defaults):
> > > >
> > > >
> > > >
> > > > state.backend: rocksdb
> > > >
> > > > state.backend.incremental: true
> > > >
> > > > execution.checkpointing.interval: 30 s
> > > >
> > > > execution.checkpointing.min-pause: 25 s
> > > >
> > > > execution.checkpointing.timeout: 5 min
> > > >
> > > > execution.savepoint-restore-mode: CLAIM
> > > >
> > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > RETAIN_ON_CANCELLATION
> > > >
> > > >
> > > >
> > > > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Alexis.
> > > >
> > > >
> > > >
> > > > From: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > > Sent: Samstag, 9. April 2022 01:39
> > > > To: roman@apache.org
> > > > Cc: user@flink.apache.org
> > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > with state processor API
> > > >
> > > >
> > > >
> > > > Hi Roman,
> > > >
> > > >
> > > >
> > > > Here's an example of a WindowReaderFunction:
> > > >
> > > >
> > > >
> > > >     public class StateReaderFunction extends
> > > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> > > >
> > > >         private static final ListStateDescriptor<Integer> LSD =
> > > > new ListStateDescriptor<>(
> > > >
> > > >                 "descriptorId",
> > > >
> > > >                 Integer.class
> > > >
> > > >         );
> > > >
> > > >
> > > >
> > > >         @Override
> > > >
> > > >         public void readWindow(String s, Context<TimeWindow>
> > > > context, Iterable<Pojo> elements, Collector<Integer> out) throws
> > > > Exception {
> > > >
> > > >             int count = 0;
> > > >
> > > >             for (Integer i :
> > > > context.windowState().getListState(LSD).get()) {
> > > >
> > > >                 count++;
> > > >
> > > >             }
> > > >
> > > >             out.collect(count);
> > > >
> > > >         }
> > > >
> > > >     }
> > > >
> > > >
> > > >
> > > > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> > > >
> > > >
> > > >
> > > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> > > >
> > > >
> > > >
> > > > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Alexis.
> > > >
> > > >
> > > >
> > > > ________________________________
> > > >
> > > > From: Roman Khachatryan <ro...@apache.org>
> > > > Sent: Friday, April 8, 2022 11:06 PM
> > > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > > Cc: user@flink.apache.org <us...@flink.apache.org>
> > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > with state processor API
> > > >
> > > >
> > > >
> > > > Hi Alexis,
> > > >
> > > > If I understand correctly, the provided StateProcessor program
> > > > gives you the number of stream elements per operator. However, you
> > > > mentioned that these operators have collection-type states
> > > > (ListState and MapState). That means that per one entry there can
> > > > be an arbitrary number of state elements.
> > > >
> > > > Have you tried estimating the state sizes directly via readKeyedState[1]?
> > > >
> > > > > The other operator does override and call clear()
> > > > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> > > >
> > > > [1]
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/jav
> > > > a/or
> > > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-jav
> > > > a.la
> > > > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFun
> > > > ctio
> > > > n-
> > > >
> > > > [2]
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java
> > > > /org
> > > > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > > > html#clear-org.apache.flink.streaming.api.functions.windowing.Proc
> > > > essW
> > > > indowFunction.Context-
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > >
> > > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
> > > > <al...@microfocus.com> wrote:
> > > > >
> > > > > Hello,
> > > > >
> > > > >
> > > > >
> > > > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > > > >
> > > > >
> > > > >
> > > > > Windows are 11 minutes in size.
> > > > > Slide time is 1 minute.
> > > > > Throughput is approximately 20 events per minute.
> > > > >
> > > > >
> > > > >
> > > > > I have 3 operators with these states:
> > > > >
> > > > >
> > > > >
> > > > > Window state with ListState<Integer> and no TTL.
> > > > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > > > >
> > > > >
> > > > >
> > > > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > > > >
> > > > >
> > > > >
> > > > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > > > >
> > > > >
> > > > >
> > > > > 10 entries
> > > > > 80 entries
> > > > > 200 entries
> > > > >
> > > > >
> > > > >
> > > > > I got those numbers with something like this:
> > > > >
> > > > >
> > > > >
> > > > > savepoint
> > > > >
> > > > >         .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > > > Time.minutes(1L)))
> > > > >
> > > > >         .process(...)
> > > > >
> > > > >         .collect()
> > > > >
> > > > >         .parallelStream()
> > > > >
> > > > >         .reduce(0, Integer::sum);
> > > > >
> > > > >
> > > > >
> > > > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > > > >
> > > > >
> > > > >
> > > > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >

Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Roman Khachatryan <ro...@apache.org>.
> I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
Yes, that's what I meant, and that's regarded as the same column family.

Another possible reason is that SST files aren't being compacted and
that increases the MANIFEST file size.
I'd check the total number of loaded SST files and the creation date
of the oldest one.

You can also see whether there are any compactions running via RocksDB
metrics [1] [2] (a reporter needs to be configured [3]).

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-num-running-compactions
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-compaction-pending
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#reporters

Regards,
Roman

On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
<al...@microfocus.com> wrote:
>
> Hi Roman,
>
> I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?
>
> For the 3 descriptors I mentioned before, they are only instantiated once and used like this:
>
> - Window list state: each call to process() executes context.windowState().getListState(...).get()
> - Global map state: each call to process() executes context.globalState().getMapState(...)
> - Global list state: within open(), runtimeContext.getListState(...) is executed once and used throughout the life of the operator.
>
> According to [1], the two ways of using global state should be equivalent.
>
> I will say that some of the operators instantiate the state descriptor in their constructors, i.e. before they are serialized to the TM, but the descriptors are Serializable, so I imagine that's not relevant.
>
> [1] https://stackoverflow.com/a/50510054/5793905
>
> Regards,
> Alexis.
>
> -----Original Message-----
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Dienstag, 19. April 2022 11:48
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API
>
> Hi Alexis,
>
> Thanks a lot for the information,
>
> MANIFEST files list RocksDB column families (among other info); ever growing size of these files might indicate that some new states are constantly being created.
> Could you please confirm that the number of state names is constant?
>
> > Could you confirm if Flink's own operators could be creating state in RocksDB? I assume the window operators save some information in the state as well.
> That's correct, window operators maintain a list of elements per window and a set of timers (timestamps). These states' names should be fixed (something like "window-contents" and "window-timers").
>
> > is that related to managed state used by my functions? Or does that indicate size growth is elsewhere?
> The same mechanism is used for both Flink internal state and operator state, so it's hard to say without at least knowing the state names.
>
>
> Regards,
> Roman
>
>
> On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org> wrote:
> >
> > /shared folder contains keyed state that is shared among different
> > checkpoints [1]. Most of state should be shared in your case since
> > you're using keyed state and incremental checkpoints.
> >
> > When a checkpoint is loaded, the state that it shares with older
> > checkpoints is loaded as well. I suggested to load different
> > checkpoints (i.e. chk-* folders) and compare the numbers of objects in
> > their states. To prevent the job from discarding the state, it can
> > either be stopped for some time and then restarted from the latest
> > checkpoint; or the number of retained checkpoints can be increased
> > [2]. Copying isn't necessary.
> >
> > Besides that, you can also check state sizes of operator in Flink Web
> > UI (but not the sizes of individual states). If the operators are
> > chained then their combined state size will be shown. To prevent this,
> > you can disable chaining [3] (although this will have performance
> > impact).
> >
> > Individual checkpoint folders should be eventually removed (when the
> > checkpoint is subsumed). However, this is not guaranteed: if there is
> > any problem during deletion, it will be logged, but the job will not
> > fail.
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/ch
> > eckpoints/#directory-structure
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> > onfig/#state-checkpoints-num-retained
> > [3]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastre
> > am/operators/overview/#disable-chaining
> >
> > Regards,
> > Roman
> >
> > On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa
> > <al...@microfocus.com> wrote:
> > >
> > > Hi Roman,
> > >
> > > Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
> > >
> > > I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
> > >
> > > Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
> > >
> > > Regards,
> > > Alexis.
> > >
> > > -----Original Message-----
> > > From: Roman Khachatryan <ro...@apache.org>
> > > Sent: Dienstag, 12. April 2022 12:37
> > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > Cc: user@flink.apache.org
> > > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > > state processor API
> > >
> > > Hi Alexis,
> > >
> > > Thanks a lot for sharing this. I think the program is correct.
> > > Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> > > But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
> > >
> > > If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
> > >
> > > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
> > >
> > > Regards,
> > > Roman
> > >
> > > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
> > > >
> > > > Some additional information that I’ve gathered:
> > > >
> > > >
> > > >
> > > > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > > > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > > > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > > > GitHub gist with the whole processor setup since it’s not too long:
> > > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> > > >
> > > >
> > > >
> > > > Relevant configuration entries (explicitly set, others are left with defaults):
> > > >
> > > >
> > > >
> > > > state.backend: rocksdb
> > > >
> > > > state.backend.incremental: true
> > > >
> > > > execution.checkpointing.interval: 30 s
> > > >
> > > > execution.checkpointing.min-pause: 25 s
> > > >
> > > > execution.checkpointing.timeout: 5 min
> > > >
> > > > execution.savepoint-restore-mode: CLAIM
> > > >
> > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > RETAIN_ON_CANCELLATION
> > > >
> > > >
> > > >
> > > > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Alexis.
> > > >
> > > >
> > > >
> > > > From: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > > Sent: Samstag, 9. April 2022 01:39
> > > > To: roman@apache.org
> > > > Cc: user@flink.apache.org
> > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > with state processor API
> > > >
> > > >
> > > >
> > > > Hi Roman,
> > > >
> > > >
> > > >
> > > > Here's an example of a WindowReaderFunction:
> > > >
> > > >
> > > >
> > > >     public class StateReaderFunction extends
> > > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> > > >
> > > >         private static final ListStateDescriptor<Integer> LSD =
> > > > new ListStateDescriptor<>(
> > > >
> > > >                 "descriptorId",
> > > >
> > > >                 Integer.class
> > > >
> > > >         );
> > > >
> > > >
> > > >
> > > >         @Override
> > > >
> > > >         public void readWindow(String s, Context<TimeWindow>
> > > > context, Iterable<Pojo> elements, Collector<Integer> out) throws
> > > > Exception {
> > > >
> > > >             int count = 0;
> > > >
> > > >             for (Integer i :
> > > > context.windowState().getListState(LSD).get()) {
> > > >
> > > >                 count++;
> > > >
> > > >             }
> > > >
> > > >             out.collect(count);
> > > >
> > > >         }
> > > >
> > > >     }
> > > >
> > > >
> > > >
> > > > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> > > >
> > > >
> > > >
> > > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> > > >
> > > >
> > > >
> > > > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Alexis.
> > > >
> > > >
> > > >
> > > > ________________________________
> > > >
> > > > From: Roman Khachatryan <ro...@apache.org>
> > > > Sent: Friday, April 8, 2022 11:06 PM
> > > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > > Cc: user@flink.apache.org <us...@flink.apache.org>
> > > > Subject: Re: RocksDB's state size discrepancy with what's seen
> > > > with state processor API
> > > >
> > > >
> > > >
> > > > Hi Alexis,
> > > >
> > > > If I understand correctly, the provided StateProcessor program
> > > > gives you the number of stream elements per operator. However, you
> > > > mentioned that these operators have collection-type states
> > > > (ListState and MapState). That means that per one entry there can
> > > > be an arbitrary number of state elements.
> > > >
> > > > Have you tried estimating the state sizes directly via readKeyedState[1]?
> > > >
> > > > > The other operator does override and call clear()
> > > > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> > > >
> > > > [1]
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/jav
> > > > a/or
> > > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-jav
> > > > a.la
> > > > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFun
> > > > ctio
> > > > n-
> > > >
> > > > [2]
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java
> > > > /org
> > > > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > > > html#clear-org.apache.flink.streaming.api.functions.windowing.Proc
> > > > essW
> > > > indowFunction.Context-
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > >
> > > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
> > > > <al...@microfocus.com> wrote:
> > > > >
> > > > > Hello,
> > > > >
> > > > >
> > > > >
> > > > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > > > >
> > > > >
> > > > >
> > > > > Windows are 11 minutes in size.
> > > > > Slide time is 1 minute.
> > > > > Throughput is approximately 20 events per minute.
> > > > >
> > > > >
> > > > >
> > > > > I have 3 operators with these states:
> > > > >
> > > > >
> > > > >
> > > > > Window state with ListState<Integer> and no TTL.
> > > > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > > > >
> > > > >
> > > > >
> > > > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > > > >
> > > > >
> > > > >
> > > > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > > > >
> > > > >
> > > > >
> > > > > 10 entries
> > > > > 80 entries
> > > > > 200 entries
> > > > >
> > > > >
> > > > >
> > > > > I got those numbers with something like this:
> > > > >
> > > > >
> > > > >
> > > > > savepoint
> > > > >
> > > > >         .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > > > Time.minutes(1L)))
> > > > >
> > > > >         .process(...)
> > > > >
> > > > >         .collect()
> > > > >
> > > > >         .parallelStream()
> > > > >
> > > > >         .reduce(0, Integer::sum);
> > > > >
> > > > >
> > > > >
> > > > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > > > >
> > > > >
> > > > >
> > > > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >

RE: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Hi Roman,

I assume that when you say "new states", that is related to new descriptors with different names? Because, in the case of windowing for example, each window "instance" has its own scoped (non-global and keyed) state, but that's not regarded as a separate column family, is it?

For the 3 descriptors I mentioned before, they are only instantiated once and used like this:

- Window list state: each call to process() executes context.windowState().getListState(...).get()
- Global map state: each call to process() executes context.globalState().getMapState(...)
- Global list state: within open(), runtimeContext.getListState(...) is executed once and used throughout the life of the operator.

According to [1], the two ways of using global state should be equivalent.

I will say that some of the operators instantiate the state descriptor in their constructors, i.e. before they are serialized to the TM, but the descriptors are Serializable, so I imagine that's not relevant.

[1] https://stackoverflow.com/a/50510054/5793905

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org> 
Sent: Dienstag, 19. April 2022 11:48
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

Hi Alexis,

Thanks a lot for the information,

MANIFEST files list RocksDB column families (among other info); ever growing size of these files might indicate that some new states are constantly being created.
Could you please confirm that the number of state names is constant?

> Could you confirm if Flink's own operators could be creating state in RocksDB? I assume the window operators save some information in the state as well.
That's correct, window operators maintain a list of elements per window and a set of timers (timestamps). These states' names should be fixed (something like "window-contents" and "window-timers").

> is that related to managed state used by my functions? Or does that indicate size growth is elsewhere?
The same mechanism is used for both Flink internal state and operator state, so it's hard to say without at least knowing the state names.


Regards,
Roman


On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org> wrote:
>
> /shared folder contains keyed state that is shared among different 
> checkpoints [1]. Most of state should be shared in your case since 
> you're using keyed state and incremental checkpoints.
>
> When a checkpoint is loaded, the state that it shares with older 
> checkpoints is loaded as well. I suggested to load different 
> checkpoints (i.e. chk-* folders) and compare the numbers of objects in 
> their states. To prevent the job from discarding the state, it can 
> either be stopped for some time and then restarted from the latest 
> checkpoint; or the number of retained checkpoints can be increased 
> [2]. Copying isn't necessary.
>
> Besides that, you can also check state sizes of operator in Flink Web 
> UI (but not the sizes of individual states). If the operators are 
> chained then their combined state size will be shown. To prevent this, 
> you can disable chaining [3] (although this will have performance 
> impact).
>
> Individual checkpoint folders should be eventually removed (when the 
> checkpoint is subsumed). However, this is not guaranteed: if there is 
> any problem during deletion, it will be logged, but the job will not 
> fail.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/ch
> eckpoints/#directory-structure
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-checkpoints-num-retained
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastre
> am/operators/overview/#disable-chaining
>
> Regards,
> Roman
>
> On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
> <al...@microfocus.com> wrote:
> >
> > Hi Roman,
> >
> > Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
> >
> > I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
> >
> > Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
> >
> > Regards,
> > Alexis.
> >
> > -----Original Message-----
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Dienstag, 12. April 2022 12:37
> > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with 
> > state processor API
> >
> > Hi Alexis,
> >
> > Thanks a lot for sharing this. I think the program is correct.
> > Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> > But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
> >
> > If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
> >
> > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
> >
> > Regards,
> > Roman
> >
> > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
> > >
> > > Some additional information that I’ve gathered:
> > >
> > >
> > >
> > > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > > GitHub gist with the whole processor setup since it’s not too long:
> > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> > >
> > >
> > >
> > > Relevant configuration entries (explicitly set, others are left with defaults):
> > >
> > >
> > >
> > > state.backend: rocksdb
> > >
> > > state.backend.incremental: true
> > >
> > > execution.checkpointing.interval: 30 s
> > >
> > > execution.checkpointing.min-pause: 25 s
> > >
> > > execution.checkpointing.timeout: 5 min
> > >
> > > execution.savepoint-restore-mode: CLAIM
> > >
> > > execution.checkpointing.externalized-checkpoint-retention:
> > > RETAIN_ON_CANCELLATION
> > >
> > >
> > >
> > > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Alexis.
> > >
> > >
> > >
> > > From: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > Sent: Samstag, 9. April 2022 01:39
> > > To: roman@apache.org
> > > Cc: user@flink.apache.org
> > > Subject: Re: RocksDB's state size discrepancy with what's seen 
> > > with state processor API
> > >
> > >
> > >
> > > Hi Roman,
> > >
> > >
> > >
> > > Here's an example of a WindowReaderFunction:
> > >
> > >
> > >
> > >     public class StateReaderFunction extends 
> > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> > >
> > >         private static final ListStateDescriptor<Integer> LSD = 
> > > new ListStateDescriptor<>(
> > >
> > >                 "descriptorId",
> > >
> > >                 Integer.class
> > >
> > >         );
> > >
> > >
> > >
> > >         @Override
> > >
> > >         public void readWindow(String s, Context<TimeWindow> 
> > > context, Iterable<Pojo> elements, Collector<Integer> out) throws 
> > > Exception {
> > >
> > >             int count = 0;
> > >
> > >             for (Integer i :
> > > context.windowState().getListState(LSD).get()) {
> > >
> > >                 count++;
> > >
> > >             }
> > >
> > >             out.collect(count);
> > >
> > >         }
> > >
> > >     }
> > >
> > >
> > >
> > > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> > >
> > >
> > >
> > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> > >
> > >
> > >
> > > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Alexis.
> > >
> > >
> > >
> > > ________________________________
> > >
> > > From: Roman Khachatryan <ro...@apache.org>
> > > Sent: Friday, April 8, 2022 11:06 PM
> > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > Cc: user@flink.apache.org <us...@flink.apache.org>
> > > Subject: Re: RocksDB's state size discrepancy with what's seen 
> > > with state processor API
> > >
> > >
> > >
> > > Hi Alexis,
> > >
> > > If I understand correctly, the provided StateProcessor program 
> > > gives you the number of stream elements per operator. However, you 
> > > mentioned that these operators have collection-type states 
> > > (ListState and MapState). That means that per one entry there can 
> > > be an arbitrary number of state elements.
> > >
> > > Have you tried estimating the state sizes directly via readKeyedState[1]?
> > >
> > > > The other operator does override and call clear()
> > > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> > >
> > > [1]
> > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/jav
> > > a/or 
> > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-jav
> > > a.la 
> > > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFun
> > > ctio
> > > n-
> > >
> > > [2]
> > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java
> > > /org 
> > > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > > html#clear-org.apache.flink.streaming.api.functions.windowing.Proc
> > > essW
> > > indowFunction.Context-
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa 
> > > <al...@microfocus.com> wrote:
> > > >
> > > > Hello,
> > > >
> > > >
> > > >
> > > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > > >
> > > >
> > > >
> > > > Windows are 11 minutes in size.
> > > > Slide time is 1 minute.
> > > > Throughput is approximately 20 events per minute.
> > > >
> > > >
> > > >
> > > > I have 3 operators with these states:
> > > >
> > > >
> > > >
> > > > Window state with ListState<Integer> and no TTL.
> > > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > > >
> > > >
> > > >
> > > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > > >
> > > >
> > > >
> > > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > > >
> > > >
> > > >
> > > > 10 entries
> > > > 80 entries
> > > > 200 entries
> > > >
> > > >
> > > >
> > > > I got those numbers with something like this:
> > > >
> > > >
> > > >
> > > > savepoint
> > > >
> > > >         .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > > Time.minutes(1L)))
> > > >
> > > >         .process(...)
> > > >
> > > >         .collect()
> > > >
> > > >         .parallelStream()
> > > >
> > > >         .reduce(0, Integer::sum);
> > > >
> > > >
> > > >
> > > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > > >
> > > >
> > > >
> > > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Alexis.
> > > >
> > > >

Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Alexis,

Thanks a lot for the information,

MANIFEST files list RocksDB column families (among other info); ever
growing size of these files might indicate that some new states are
constantly being created.
Could you please confirm that the number of state names is constant?

> Could you confirm if Flink's own operators could be creating state in RocksDB? I assume the window operators save some information in the state as well.
That's correct, window operators maintain a list of elements per
window and a set of timers (timestamps). These states' names should be
fixed (something like "window-contents" and "window-timers").

> is that related to managed state used by my functions? Or does that indicate size growth is elsewhere?
The same mechanism is used for both Flink internal state and operator
state, so it's hard to say without at least knowing the state names.


Regards,
Roman


On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org> wrote:
>
> /shared folder contains keyed state that is shared among different
> checkpoints [1]. Most of state should be shared in your case since
> you're using keyed state and incremental checkpoints.
>
> When a checkpoint is loaded, the state that it shares with older
> checkpoints is loaded as well. I suggested to load different
> checkpoints (i.e. chk-* folders) and compare the numbers of objects in
> their states. To prevent the job from discarding the state, it can
> either be stopped for some time and then restarted from the latest
> checkpoint; or the number of retained checkpoints can be increased
> [2]. Copying isn't necessary.
>
> Besides that, you can also check state sizes of operator in Flink Web
> UI (but not the sizes of individual states). If the operators are
> chained then their combined state size will be shown. To prevent this,
> you can disable chaining [3] (although this will have performance
> impact).
>
> Individual checkpoint folders should be eventually removed (when the
> checkpoint is subsumed). However, this is not guaranteed: if there is
> any problem during deletion, it will be logged, but the job will not
> fail.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining
>
> Regards,
> Roman
>
> On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa
> <al...@microfocus.com> wrote:
> >
> > Hi Roman,
> >
> > Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
> >
> > I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
> >
> > Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
> >
> > Regards,
> > Alexis.
> >
> > -----Original Message-----
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Dienstag, 12. April 2022 12:37
> > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API
> >
> > Hi Alexis,
> >
> > Thanks a lot for sharing this. I think the program is correct.
> > Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> > But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
> >
> > If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
> >
> > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
> >
> > Regards,
> > Roman
> >
> > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
> > >
> > > Some additional information that I’ve gathered:
> > >
> > >
> > >
> > > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > > GitHub gist with the whole processor setup since it’s not too long:
> > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> > >
> > >
> > >
> > > Relevant configuration entries (explicitly set, others are left with defaults):
> > >
> > >
> > >
> > > state.backend: rocksdb
> > >
> > > state.backend.incremental: true
> > >
> > > execution.checkpointing.interval: 30 s
> > >
> > > execution.checkpointing.min-pause: 25 s
> > >
> > > execution.checkpointing.timeout: 5 min
> > >
> > > execution.savepoint-restore-mode: CLAIM
> > >
> > > execution.checkpointing.externalized-checkpoint-retention:
> > > RETAIN_ON_CANCELLATION
> > >
> > >
> > >
> > > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Alexis.
> > >
> > >
> > >
> > > From: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > Sent: Samstag, 9. April 2022 01:39
> > > To: roman@apache.org
> > > Cc: user@flink.apache.org
> > > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > > state processor API
> > >
> > >
> > >
> > > Hi Roman,
> > >
> > >
> > >
> > > Here's an example of a WindowReaderFunction:
> > >
> > >
> > >
> > >     public class StateReaderFunction extends
> > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> > >
> > >         private static final ListStateDescriptor<Integer> LSD = new
> > > ListStateDescriptor<>(
> > >
> > >                 "descriptorId",
> > >
> > >                 Integer.class
> > >
> > >         );
> > >
> > >
> > >
> > >         @Override
> > >
> > >         public void readWindow(String s, Context<TimeWindow> context,
> > > Iterable<Pojo> elements, Collector<Integer> out) throws Exception {
> > >
> > >             int count = 0;
> > >
> > >             for (Integer i :
> > > context.windowState().getListState(LSD).get()) {
> > >
> > >                 count++;
> > >
> > >             }
> > >
> > >             out.collect(count);
> > >
> > >         }
> > >
> > >     }
> > >
> > >
> > >
> > > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> > >
> > >
> > >
> > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> > >
> > >
> > >
> > > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Alexis.
> > >
> > >
> > >
> > > ________________________________
> > >
> > > From: Roman Khachatryan <ro...@apache.org>
> > > Sent: Friday, April 8, 2022 11:06 PM
> > > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > > Cc: user@flink.apache.org <us...@flink.apache.org>
> > > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > > state processor API
> > >
> > >
> > >
> > > Hi Alexis,
> > >
> > > If I understand correctly, the provided StateProcessor program gives
> > > you the number of stream elements per operator. However, you mentioned
> > > that these operators have collection-type states (ListState and
> > > MapState). That means that per one entry there can be an arbitrary
> > > number of state elements.
> > >
> > > Have you tried estimating the state sizes directly via readKeyedState[1]?
> > >
> > > > The other operator does override and call clear()
> > > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> > >
> > > [1]
> > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/or
> > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.la
> > > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFunctio
> > > n-
> > >
> > > [2]
> > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org
> > > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > > html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessW
> > > indowFunction.Context-
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
> > > <al...@microfocus.com> wrote:
> > > >
> > > > Hello,
> > > >
> > > >
> > > >
> > > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > > >
> > > >
> > > >
> > > > Windows are 11 minutes in size.
> > > > Slide time is 1 minute.
> > > > Throughput is approximately 20 events per minute.
> > > >
> > > >
> > > >
> > > > I have 3 operators with these states:
> > > >
> > > >
> > > >
> > > > Window state with ListState<Integer> and no TTL.
> > > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > > >
> > > >
> > > >
> > > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > > >
> > > >
> > > >
> > > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > > >
> > > >
> > > >
> > > > 10 entries
> > > > 80 entries
> > > > 200 entries
> > > >
> > > >
> > > >
> > > > I got those numbers with something like this:
> > > >
> > > >
> > > >
> > > > savepoint
> > > >
> > > >         .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > > Time.minutes(1L)))
> > > >
> > > >         .process(...)
> > > >
> > > >         .collect()
> > > >
> > > >         .parallelStream()
> > > >
> > > >         .reduce(0, Integer::sum);
> > > >
> > > >
> > > >
> > > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > > >
> > > >
> > > >
> > > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Alexis.
> > > >
> > > >

RE: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Hello,

There was a network issue in my environment and the job had to restart. After the job came back up, the logs showed a lot of lines like this:

RocksDBIncrementalRestoreOperation ... Starting to restore from state handle: ...

Interestingly, those entries include information about sizes in bytes:

- 445163.sst=ByteStreamStateHandle{handleName='file:/opt/flink/state/checkpoints/00000000000000000000000000000000/shared/18f95afa-dc66-467d-bd05-779895f24960', dataBytes=1328}
- privateState={MANIFEST-000004=File State: file:/opt/flink/state/checkpoints/00000000000000000000000000000000/shared/bd7fde24-3ef6-4e05-bbd6-1474f8051d5d [80921331 bytes]

I extracted a lot of that information and I can see that:

- If I sum all dataBytes from sharedState, that only accounts for a couple MB.
- Most of the state comes from privateState, specifically from the entries referring to MANIFEST File State; that accounts for almost 1.5GB.

I believe that is one of the files RocksDB uses internally, but is that related to managed state used by my functions? Or does that indicate size growth is elsewhere?

Regards,
Alexis.

-----Original Message-----
From: Alexis Sarda-Espinosa <al...@microfocus.com> 
Sent: Dienstag, 12. April 2022 15:39
To: roman@apache.org
Cc: user@flink.apache.org
Subject: RE: RocksDB's state size discrepancy with what's seen with state processor API

Thanks for all the pointers. The UI does show combined state for a chain, but the only state descriptors inside that chain are the 3 I mentioned before. Its size is still increasing today, and duration is now around 30 seconds (I can't use unaligned checkpoints because I use partitionCustom).

I've executed the state processor program for all of the 50 chk-* folders, but I don't see anything weird. The counts go up and down depending on which one I load, but even the bigger ones have around 500-700 entries, which should only be a couple hundred KB; it's not growing monotonically.

The chain of operators is relatively simple:

    timestampedStream = inputStream -> keyBy -> assignTimestampsAndWatermarks
    windowedStream      = timestampedStream -> reinterpretAsKeyedStream -> window (SlidingEventTimeWindows)
    windowedStream -> process1 -> sink1
    windowedStream -> process2 -> sink2
    windowedStream -> process3 -> map

And according to the Low Watermark I see in the UI, event time is advancing correctly.

Could you confirm if Flink's own operators could be creating state in RocksDB? I assume the window operators save some information in the state as well.

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org>
Sent: Dienstag, 12. April 2022 14:06
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

/shared folder contains keyed state that is shared among different checkpoints [1]. Most of state should be shared in your case since you're using keyed state and incremental checkpoints.

When a checkpoint is loaded, the state that it shares with older checkpoints is loaded as well. I suggested to load different checkpoints (i.e. chk-* folders) and compare the numbers of objects in their states. To prevent the job from discarding the state, it can either be stopped for some time and then restarted from the latest checkpoint; or the number of retained checkpoints can be increased [2]. Copying isn't necessary.

Besides that, you can also check state sizes of operator in Flink Web UI (but not the sizes of individual states). If the operators are chained then their combined state size will be shown. To prevent this, you can disable chaining [3] (although this will have performance impact).

Individual checkpoint folders should be eventually removed (when the checkpoint is subsumed). However, this is not guaranteed: if there is any problem during deletion, it will be logged, but the job will not fail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining

Regards,
Roman

On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
>
> Hi Roman,
>
> Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
>
> I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
>
> Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
>
> Regards,
> Alexis.
>
> -----Original Message-----
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Dienstag, 12. April 2022 12:37
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
> Hi Alexis,
>
> Thanks a lot for sharing this. I think the program is correct.
> Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
>
> If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
>
> Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
>
> Regards,
> Roman
>
> On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
> >
> > Some additional information that I’ve gathered:
> >
> >
> >
> > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > GitHub gist with the whole processor setup since it’s not too long:
> > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> >
> >
> >
> > Relevant configuration entries (explicitly set, others are left with defaults):
> >
> >
> >
> > state.backend: rocksdb
> >
> > state.backend.incremental: true
> >
> > execution.checkpointing.interval: 30 s
> >
> > execution.checkpointing.min-pause: 25 s
> >
> > execution.checkpointing.timeout: 5 min
> >
> > execution.savepoint-restore-mode: CLAIM
> >
> > execution.checkpointing.externalized-checkpoint-retention:
> > RETAIN_ON_CANCELLATION
> >
> >
> >
> > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >
> >
> > From: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Sent: Samstag, 9. April 2022 01:39
> > To: roman@apache.org
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with 
> > state processor API
> >
> >
> >
> > Hi Roman,
> >
> >
> >
> > Here's an example of a WindowReaderFunction:
> >
> >
> >
> >     public class StateReaderFunction extends 
> > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> >
> >         private static final ListStateDescriptor<Integer> LSD = new 
> > ListStateDescriptor<>(
> >
> >                 "descriptorId",
> >
> >                 Integer.class
> >
> >         );
> >
> >
> >
> >         @Override
> >
> >         public void readWindow(String s, Context<TimeWindow> 
> > context, Iterable<Pojo> elements, Collector<Integer> out) throws 
> > Exception {
> >
> >             int count = 0;
> >
> >             for (Integer i :
> > context.windowState().getListState(LSD).get()) {
> >
> >                 count++;
> >
> >             }
> >
> >             out.collect(count);
> >
> >         }
> >
> >     }
> >
> >
> >
> > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> >
> >
> >
> > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> >
> >
> >
> > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >
> >
> > ________________________________
> >
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Friday, April 8, 2022 11:06 PM
> > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Cc: user@flink.apache.org <us...@flink.apache.org>
> > Subject: Re: RocksDB's state size discrepancy with what's seen with 
> > state processor API
> >
> >
> >
> > Hi Alexis,
> >
> > If I understand correctly, the provided StateProcessor program gives 
> > you the number of stream elements per operator. However, you 
> > mentioned that these operators have collection-type states 
> > (ListState and MapState). That means that per one entry there can be 
> > an arbitrary number of state elements.
> >
> > Have you tried estimating the state sizes directly via readKeyedState[1]?
> >
> > > The other operator does override and call clear()
> > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/
> > or
> > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.
> > la
> > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFunct
> > io
> > n-
> >
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/o
> > rg
> > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > html#clear-org.apache.flink.streaming.api.functions.windowing.Proces
> > sW
> > indowFunction.Context-
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa 
> > <al...@microfocus.com> wrote:
> > >
> > > Hello,
> > >
> > >
> > >
> > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > >
> > >
> > >
> > > Windows are 11 minutes in size.
> > > Slide time is 1 minute.
> > > Throughput is approximately 20 events per minute.
> > >
> > >
> > >
> > > I have 3 operators with these states:
> > >
> > >
> > >
> > > Window state with ListState<Integer> and no TTL.
> > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > >
> > >
> > >
> > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > >
> > >
> > >
> > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > >
> > >
> > >
> > > 10 entries
> > > 80 entries
> > > 200 entries
> > >
> > >
> > >
> > > I got those numbers with something like this:
> > >
> > >
> > >
> > > savepoint
> > >
> > >         .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > Time.minutes(1L)))
> > >
> > >         .process(...)
> > >
> > >         .collect()
> > >
> > >         .parallelStream()
> > >
> > >         .reduce(0, Integer::sum);
> > >
> > >
> > >
> > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > >
> > >
> > >
> > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Alexis.
> > >
> > >

RE: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Thanks for all the pointers. The UI does show combined state for a chain, but the only state descriptors inside that chain are the 3 I mentioned before. Its size is still increasing today, and duration is now around 30 seconds (I can't use unaligned checkpoints because I use partitionCustom).

I've executed the state processor program for all of the 50 chk-* folders, but I don't see anything weird. The counts go up and down depending on which one I load, but even the bigger ones have around 500-700 entries, which should only be a couple hundred KB; it's not growing monotonically.

The chain of operators is relatively simple:

    timestampedStream = inputStream -> keyBy -> assignTimestampsAndWatermarks
    windowedStream      = timestampedStream -> reinterpretAsKeyedStream -> window (SlidingEventTimeWindows)
    windowedStream -> process1 -> sink1
    windowedStream -> process2 -> sink2
    windowedStream -> process3 -> map

And according to the Low Watermark I see in the UI, event time is advancing correctly.

Could you confirm if Flink's own operators could be creating state in RocksDB? I assume the window operators save some information in the state as well.

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org> 
Sent: Dienstag, 12. April 2022 14:06
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

/shared folder contains keyed state that is shared among different checkpoints [1]. Most of state should be shared in your case since you're using keyed state and incremental checkpoints.

When a checkpoint is loaded, the state that it shares with older checkpoints is loaded as well. I suggested to load different checkpoints (i.e. chk-* folders) and compare the numbers of objects in their states. To prevent the job from discarding the state, it can either be stopped for some time and then restarted from the latest checkpoint; or the number of retained checkpoints can be increased [2]. Copying isn't necessary.

Besides that, you can also check state sizes of operator in Flink Web UI (but not the sizes of individual states). If the operators are chained then their combined state size will be shown. To prevent this, you can disable chaining [3] (although this will have performance impact).

Individual checkpoint folders should be eventually removed (when the checkpoint is subsumed). However, this is not guaranteed: if there is any problem during deletion, it will be logged, but the job will not fail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining

Regards,
Roman

On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
>
> Hi Roman,
>
> Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
>
> I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
>
> Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
>
> Regards,
> Alexis.
>
> -----Original Message-----
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Dienstag, 12. April 2022 12:37
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
> Hi Alexis,
>
> Thanks a lot for sharing this. I think the program is correct.
> Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
>
> If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
>
> Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
>
> Regards,
> Roman
>
> On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
> >
> > Some additional information that I’ve gathered:
> >
> >
> >
> > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > GitHub gist with the whole processor setup since it’s not too long:
> > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> >
> >
> >
> > Relevant configuration entries (explicitly set, others are left with defaults):
> >
> >
> >
> > state.backend: rocksdb
> >
> > state.backend.incremental: true
> >
> > execution.checkpointing.interval: 30 s
> >
> > execution.checkpointing.min-pause: 25 s
> >
> > execution.checkpointing.timeout: 5 min
> >
> > execution.savepoint-restore-mode: CLAIM
> >
> > execution.checkpointing.externalized-checkpoint-retention:
> > RETAIN_ON_CANCELLATION
> >
> >
> >
> > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >
> >
> > From: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Sent: Samstag, 9. April 2022 01:39
> > To: roman@apache.org
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with 
> > state processor API
> >
> >
> >
> > Hi Roman,
> >
> >
> >
> > Here's an example of a WindowReaderFunction:
> >
> >
> >
> >     public class StateReaderFunction extends 
> > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> >
> >         private static final ListStateDescriptor<Integer> LSD = new 
> > ListStateDescriptor<>(
> >
> >                 "descriptorId",
> >
> >                 Integer.class
> >
> >         );
> >
> >
> >
> >         @Override
> >
> >         public void readWindow(String s, Context<TimeWindow> 
> > context, Iterable<Pojo> elements, Collector<Integer> out) throws 
> > Exception {
> >
> >             int count = 0;
> >
> >             for (Integer i :
> > context.windowState().getListState(LSD).get()) {
> >
> >                 count++;
> >
> >             }
> >
> >             out.collect(count);
> >
> >         }
> >
> >     }
> >
> >
> >
> > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> >
> >
> >
> > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> >
> >
> >
> > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >
> >
> > ________________________________
> >
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Friday, April 8, 2022 11:06 PM
> > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Cc: user@flink.apache.org <us...@flink.apache.org>
> > Subject: Re: RocksDB's state size discrepancy with what's seen with 
> > state processor API
> >
> >
> >
> > Hi Alexis,
> >
> > If I understand correctly, the provided StateProcessor program gives 
> > you the number of stream elements per operator. However, you 
> > mentioned that these operators have collection-type states 
> > (ListState and MapState). That means that per one entry there can be 
> > an arbitrary number of state elements.
> >
> > Have you tried estimating the state sizes directly via readKeyedState[1]?
> >
> > > The other operator does override and call clear()
> > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/
> > or 
> > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.
> > la 
> > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFunct
> > io
> > n-
> >
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/o
> > rg 
> > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > html#clear-org.apache.flink.streaming.api.functions.windowing.Proces
> > sW
> > indowFunction.Context-
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa 
> > <al...@microfocus.com> wrote:
> > >
> > > Hello,
> > >
> > >
> > >
> > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > >
> > >
> > >
> > > Windows are 11 minutes in size.
> > > Slide time is 1 minute.
> > > Throughput is approximately 20 events per minute.
> > >
> > >
> > >
> > > I have 3 operators with these states:
> > >
> > >
> > >
> > > Window state with ListState<Integer> and no TTL.
> > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > >
> > >
> > >
> > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > >
> > >
> > >
> > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > >
> > >
> > >
> > > 10 entries
> > > 80 entries
> > > 200 entries
> > >
> > >
> > >
> > > I got those numbers with something like this:
> > >
> > >
> > >
> > > savepoint
> > >
> > >         .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > Time.minutes(1L)))
> > >
> > >         .process(...)
> > >
> > >         .collect()
> > >
> > >         .parallelStream()
> > >
> > >         .reduce(0, Integer::sum);
> > >
> > >
> > >
> > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > >
> > >
> > >
> > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Alexis.
> > >
> > >

Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Roman Khachatryan <ro...@apache.org>.
/shared folder contains keyed state that is shared among different
checkpoints [1]. Most of state should be shared in your case since
you're using keyed state and incremental checkpoints.

When a checkpoint is loaded, the state that it shares with older
checkpoints is loaded as well. I suggested to load different
checkpoints (i.e. chk-* folders) and compare the numbers of objects in
their states. To prevent the job from discarding the state, it can
either be stopped for some time and then restarted from the latest
checkpoint; or the number of retained checkpoints can be increased
[2]. Copying isn't necessary.

Besides that, you can also check state sizes of operator in Flink Web
UI (but not the sizes of individual states). If the operators are
chained then their combined state size will be shown. To prevent this,
you can disable chaining [3] (although this will have performance
impact).

Individual checkpoint folders should be eventually removed (when the
checkpoint is subsumed). However, this is not guaranteed: if there is
any problem during deletion, it will be logged, but the job will not
fail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining

Regards,
Roman

On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa
<al...@microfocus.com> wrote:
>
> Hi Roman,
>
> Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?
>
> I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.
>
> Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?
>
> Regards,
> Alexis.
>
> -----Original Message-----
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Dienstag, 12. April 2022 12:37
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API
>
> Hi Alexis,
>
> Thanks a lot for sharing this. I think the program is correct.
> Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
> But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.
>
> If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.
>
> Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.
>
> Regards,
> Roman
>
> On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
> >
> > Some additional information that I’ve gathered:
> >
> >
> >
> > The number of unique keys in the system is 10, and that is correctly reflected in the state.
> > TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> > Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> > GitHub gist with the whole processor setup since it’s not too long:
> > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> >
> >
> >
> > Relevant configuration entries (explicitly set, others are left with defaults):
> >
> >
> >
> > state.backend: rocksdb
> >
> > state.backend.incremental: true
> >
> > execution.checkpointing.interval: 30 s
> >
> > execution.checkpointing.min-pause: 25 s
> >
> > execution.checkpointing.timeout: 5 min
> >
> > execution.savepoint-restore-mode: CLAIM
> >
> > execution.checkpointing.externalized-checkpoint-retention:
> > RETAIN_ON_CANCELLATION
> >
> >
> >
> > Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >
> >
> > From: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Sent: Samstag, 9. April 2022 01:39
> > To: roman@apache.org
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > state processor API
> >
> >
> >
> > Hi Roman,
> >
> >
> >
> > Here's an example of a WindowReaderFunction:
> >
> >
> >
> >     public class StateReaderFunction extends
> > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> >
> >         private static final ListStateDescriptor<Integer> LSD = new
> > ListStateDescriptor<>(
> >
> >                 "descriptorId",
> >
> >                 Integer.class
> >
> >         );
> >
> >
> >
> >         @Override
> >
> >         public void readWindow(String s, Context<TimeWindow> context,
> > Iterable<Pojo> elements, Collector<Integer> out) throws Exception {
> >
> >             int count = 0;
> >
> >             for (Integer i :
> > context.windowState().getListState(LSD).get()) {
> >
> >                 count++;
> >
> >             }
> >
> >             out.collect(count);
> >
> >         }
> >
> >     }
> >
> >
> >
> > That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
> >
> >
> >
> > And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
> >
> >
> >
> > Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >
> >
> > ________________________________
> >
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Friday, April 8, 2022 11:06 PM
> > To: Alexis Sarda-Espinosa <al...@microfocus.com>
> > Cc: user@flink.apache.org <us...@flink.apache.org>
> > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > state processor API
> >
> >
> >
> > Hi Alexis,
> >
> > If I understand correctly, the provided StateProcessor program gives
> > you the number of stream elements per operator. However, you mentioned
> > that these operators have collection-type states (ListState and
> > MapState). That means that per one entry there can be an arbitrary
> > number of state elements.
> >
> > Have you tried estimating the state sizes directly via readKeyedState[1]?
> >
> > > The other operator does override and call clear()
> > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/or
> > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.la
> > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFunctio
> > n-
> >
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org
> > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessW
> > indowFunction.Context-
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
> > <al...@microfocus.com> wrote:
> > >
> > > Hello,
> > >
> > >
> > >
> > > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> > >
> > >
> > >
> > > Windows are 11 minutes in size.
> > > Slide time is 1 minute.
> > > Throughput is approximately 20 events per minute.
> > >
> > >
> > >
> > > I have 3 operators with these states:
> > >
> > >
> > >
> > > Window state with ListState<Integer> and no TTL.
> > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> > >
> > >
> > >
> > > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> > >
> > >
> > >
> > > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> > >
> > >
> > >
> > > 10 entries
> > > 80 entries
> > > 200 entries
> > >
> > >
> > >
> > > I got those numbers with something like this:
> > >
> > >
> > >
> > > savepoint
> > >
> > >         .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > Time.minutes(1L)))
> > >
> > >         .process(...)
> > >
> > >         .collect()
> > >
> > >         .parallelStream()
> > >
> > >         .reduce(0, Integer::sum);
> > >
> > >
> > >
> > > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> > >
> > >
> > >
> > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Alexis.
> > >
> > >

RE: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Hi Roman,

Maybe I'm misunderstanding the structure of the data within the checkpoint. You suggest comparing counts of objects in different checkpoints, I assume you mean copying my "checkpoints" folder at different times and comparing, not comparing different "chk-*" folders in the same snapshot, right?

I haven't executed the processor program with a newer checkpoint, but I did look at the folder in the running system, and I noticed that most of the chk-* folders have remained unchanged, there's only 1 or 2 new folders corresponding to newer checkpoints. I would think this makes sense since the configuration specifies that only 1 completed checkpoint should be retained, but then why are the older chk-* folders still there? I did trigger a manual restart of the Flink cluster in the past (before starting the long-running test), but if my policy is to CLAIM the checkpoint, Flink's documentation states that it would be cleaned eventually.

Moreover, just by looking at folder sizes with "du", I can see that most of the state is held in the "shared" folder, and that has grown for sure; I'm not sure what "shared" usually holds, but if that's what's growing, maybe I can rule out expired state staying around?. My pipeline doesn't use timers, although I guess Flink itself may use them. Is there any way I could get some insight into which operator holds larger states?

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org> 
Sent: Dienstag, 12. April 2022 12:37
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

Hi Alexis,

Thanks a lot for sharing this. I think the program is correct.
Although it doesn't take timers into account; and to estimate the state size more accurately, you could also use the same serializers used by the job.
But maybe it makes more sense to compare the counts of objects in different checkpoints and see which state is growing.

If the number of keys is small, compaction should eventually clean up the old values, given that the windows eventually expire. I think it makes sense to check that watermarks in all windows are making progress.

Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of the State Processor program.

Regards,
Roman

On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa <al...@microfocus.com> wrote:
>
> Some additional information that I’ve gathered:
>
>
>
> The number of unique keys in the system is 10, and that is correctly reflected in the state.
> TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> GitHub gist with the whole processor setup since it’s not too long: 
> https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
>
>
>
> Relevant configuration entries (explicitly set, others are left with defaults):
>
>
>
> state.backend: rocksdb
>
> state.backend.incremental: true
>
> execution.checkpointing.interval: 30 s
>
> execution.checkpointing.min-pause: 25 s
>
> execution.checkpointing.timeout: 5 min
>
> execution.savepoint-restore-mode: CLAIM
>
> execution.checkpointing.externalized-checkpoint-retention: 
> RETAIN_ON_CANCELLATION
>
>
>
> Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> From: Alexis Sarda-Espinosa <al...@microfocus.com>
> Sent: Samstag, 9. April 2022 01:39
> To: roman@apache.org
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
>
>
> Hi Roman,
>
>
>
> Here's an example of a WindowReaderFunction:
>
>
>
>     public class StateReaderFunction extends 
> WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
>
>         private static final ListStateDescriptor<Integer> LSD = new 
> ListStateDescriptor<>(
>
>                 "descriptorId",
>
>                 Integer.class
>
>         );
>
>
>
>         @Override
>
>         public void readWindow(String s, Context<TimeWindow> context, 
> Iterable<Pojo> elements, Collector<Integer> out) throws Exception {
>
>             int count = 0;
>
>             for (Integer i : 
> context.windowState().getListState(LSD).get()) {
>
>                 count++;
>
>             }
>
>             out.collect(count);
>
>         }
>
>     }
>
>
>
> That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
>
>
>
> And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
>
>
>
> Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> ________________________________
>
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Friday, April 8, 2022 11:06 PM
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org <us...@flink.apache.org>
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
>
>
> Hi Alexis,
>
> If I understand correctly, the provided StateProcessor program gives 
> you the number of stream elements per operator. However, you mentioned 
> that these operators have collection-type states (ListState and 
> MapState). That means that per one entry there can be an arbitrary 
> number of state elements.
>
> Have you tried estimating the state sizes directly via readKeyedState[1]?
>
> > The other operator does override and call clear()
> Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/or
> g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.la
> ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFunctio
> n-
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org
> /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessW
> indowFunction.Context-
>
> Regards,
> Roman
>
>
> On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa 
> <al...@microfocus.com> wrote:
> >
> > Hello,
> >
> >
> >
> > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> >
> >
> >
> > Windows are 11 minutes in size.
> > Slide time is 1 minute.
> > Throughput is approximately 20 events per minute.
> >
> >
> >
> > I have 3 operators with these states:
> >
> >
> >
> > Window state with ListState<Integer> and no TTL.
> > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> >
> >
> >
> > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> >
> >
> >
> > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> >
> >
> >
> > 10 entries
> > 80 entries
> > 200 entries
> >
> >
> >
> > I got those numbers with something like this:
> >
> >
> >
> > savepoint
> >
> >         .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> > Time.minutes(1L)))
> >
> >         .process(...)
> >
> >         .collect()
> >
> >         .parallelStream()
> >
> >         .reduce(0, Integer::sum);
> >
> >
> >
> > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> >
> >
> >
> > Those amounts cannot possibly account for 614MB, so what am I missing?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >

Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Alexis,

Thanks a lot for sharing this. I think the program is correct.
Although it doesn't take timers into account; and to estimate the
state size more accurately, you could also use the same serializers
used by the job.
But maybe it makes more sense to compare the counts of objects in
different checkpoints and see which state is growing.

If the number of keys is small, compaction should eventually clean up
the old values, given that the windows eventually expire. I think it
makes sense to check that watermarks in all windows are making
progress.

Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the
results of the State Processor program.

Regards,
Roman

On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa
<al...@microfocus.com> wrote:
>
> Some additional information that I’ve gathered:
>
>
>
> The number of unique keys in the system is 10, and that is correctly reflected in the state.
> TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
> Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> GitHub gist with the whole processor setup since it’s not too long: https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
>
>
>
> Relevant configuration entries (explicitly set, others are left with defaults):
>
>
>
> state.backend: rocksdb
>
> state.backend.incremental: true
>
> execution.checkpointing.interval: 30 s
>
> execution.checkpointing.min-pause: 25 s
>
> execution.checkpointing.timeout: 5 min
>
> execution.savepoint-restore-mode: CLAIM
>
> execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
>
>
>
> Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I’m still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn’t make sense if I have finite keys, right?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> From: Alexis Sarda-Espinosa <al...@microfocus.com>
> Sent: Samstag, 9. April 2022 01:39
> To: roman@apache.org
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API
>
>
>
> Hi Roman,
>
>
>
> Here's an example of a WindowReaderFunction:
>
>
>
>     public class StateReaderFunction extends WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
>
>         private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
>
>                 "descriptorId",
>
>                 Integer.class
>
>         );
>
>
>
>         @Override
>
>         public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<Integer> out) throws Exception {
>
>             int count = 0;
>
>             for (Integer i : context.windowState().getListState(LSD).get()) {
>
>                 count++;
>
>             }
>
>             out.collect(count);
>
>         }
>
>     }
>
>
>
> That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.
>
>
>
> And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().
>
>
>
> Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> ________________________________
>
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Friday, April 8, 2022 11:06 PM
> To: Alexis Sarda-Espinosa <al...@microfocus.com>
> Cc: user@flink.apache.org <us...@flink.apache.org>
> Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API
>
>
>
> Hi Alexis,
>
> If I understand correctly, the provided StateProcessor program gives
> you the number of stream elements per operator. However, you mentioned
> that these operators have collection-type states (ListState and
> MapState). That means that per one entry there can be an arbitrary
> number of state elements.
>
> Have you tried estimating the state sizes directly via readKeyedState[1]?
>
> > The other operator does override and call clear()
> Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-
>
> Regards,
> Roman
>
>
> On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
> <al...@microfocus.com> wrote:
> >
> > Hello,
> >
> >
> >
> > I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
> >
> >
> >
> > Windows are 11 minutes in size.
> > Slide time is 1 minute.
> > Throughput is approximately 20 events per minute.
> >
> >
> >
> > I have 3 operators with these states:
> >
> >
> >
> > Window state with ListState<Integer> and no TTL.
> > Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
> >
> >
> >
> > Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
> >
> >
> >
> > I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
> >
> >
> >
> > 10 entries
> > 80 entries
> > 200 entries
> >
> >
> >
> > I got those numbers with something like this:
> >
> >
> >
> > savepoint
> >
> >         .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
> >
> >         .process(...)
> >
> >         .collect()
> >
> >         .parallelStream()
> >
> >         .reduce(0, Integer::sum);
> >
> >
> >
> > Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
> >
> >
> >
> > Those amounts cannot possibly account for 614MB, so what am I missing?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >

RE: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Some additional information that I've gathered:


  *   The number of unique keys in the system is 10, and that is correctly reflected in the state.
  *   TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time.
  *   Not sure it's relevant, but the Flink cluster does run with jemalloc enabled.
  *   GitHub gist with the whole processor setup since it's not too long: https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678

Relevant configuration entries (explicitly set, others are left with defaults):

state.backend: rocksdb
state.backend.incremental: true
execution.checkpointing.interval: 30 s
execution.checkpointing.min-pause: 25 s
execution.checkpointing.timeout: 5 min
execution.savepoint-restore-mode: CLAIM
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I'm still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn't make sense if I have finite keys, right?

Regards,
Alexis.

From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Samstag, 9. April 2022 01:39
To: roman@apache.org
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

Hi Roman,

Here's an example of a WindowReaderFunction:

    public class StateReaderFunction extends WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
        private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
                "descriptorId",
                Integer.class
        );

        @Override
        public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<Integer> out) throws Exception {
            int count = 0;
            for (Integer i : context.windowState().getListState(LSD).get()) {
                count++;
            }
            out.collect(count);
        }
    }

That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.

And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().


Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?

Regards,
Alexis.

________________________________
From: Roman Khachatryan <ro...@apache.org>>
Sent: Friday, April 8, 2022 11:06 PM
To: Alexis Sarda-Espinosa <al...@microfocus.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
<al...@microfocus.com>> wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I've been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job's setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState<Integer> and no TTL.
> Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
>
>
>
> Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
>
>
>
> I have now analyzed the checkpoint folder with the state processor API, and I'll note here that I see 50 folders named chk-*** even though I don't set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
>
>
>
> 10 entries
> 80 entries
> 200 entries
>
>
>
> I got those numbers with something like this:
>
>
>
> savepoint
>
>         .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
>
>         .process(...)
>
>         .collect()
>
>         .parallelStream()
>
>         .reduce(0, Integer::sum);
>
>
>
> Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
>
>
>
> Those amounts cannot possibly account for 614MB, so what am I missing?
>
>
>
> Regards,
>
> Alexis.
>
>

Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
Hi Roman,

Here's an example of a WindowReaderFunction:

    public class StateReaderFunction extends WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
        private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
                "descriptorId",
                Integer.class
        );

        @Override
        public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<Integer> out) throws Exception {
            int count = 0;
            for (Integer i : context.windowState().getListState(LSD).get()) {
                count++;
            }
            out.collect(count);
        }
    }

That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator.

And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear().

Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it?

Regards,
Alexis.

________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Friday, April 8, 2022 11:06 PM
To: Alexis Sarda-Espinosa <al...@microfocus.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API

Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
<al...@microfocus.com> wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState<Integer> and no TTL.
> Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
>
>
>
> Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
>
>
>
> I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
>
>
>
> 10 entries
> 80 entries
> 200 entries
>
>
>
> I got those numbers with something like this:
>
>
>
> savepoint
>
>         .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
>
>         .process(...)
>
>         .collect()
>
>         .parallelStream()
>
>         .reduce(0, Integer::sum);
>
>
>
> Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
>
>
>
> Those amounts cannot possibly account for 614MB, so what am I missing?
>
>
>
> Regards,
>
> Alexis.
>
>

Re: RocksDB's state size discrepancy with what's seen with state processor API

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
<al...@microfocus.com> wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job’s setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState<Integer> and no TTL.
> Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well.
>
>
>
> Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear().
>
>
>
> I have now analyzed the checkpoint folder with the state processor API, and I’ll note here that I see 50 folders named chk-*** even though I don’t set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively:
>
>
>
> 10 entries
> 80 entries
> 200 entries
>
>
>
> I got those numbers with something like this:
>
>
>
> savepoint
>
>         .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
>
>         .process(...)
>
>         .collect()
>
>         .parallelStream()
>
>         .reduce(0, Integer::sum);
>
>
>
> Where my WindowReaderFunction classes just count the number of entries in each call to readWindow.
>
>
>
> Those amounts cannot possibly account for 614MB, so what am I missing?
>
>
>
> Regards,
>
> Alexis.
>
>