You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Tang (Jira)" <ji...@apache.org> on 2020/05/19 06:28:00 UTC

[jira] [Commented] (FLINK-17800) RocksDB optimizeForPointLookup results in missing time windows

    [ https://issues.apache.org/jira/browse/FLINK-17800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110892#comment-17110892 ] 

Yun Tang commented on FLINK-17800:
----------------------------------

[~YordanPavlov] Thanks for reporting this. 

Have you tried Flink-1.10 without optimizeForPointLookup, and will you job also meet this problem again?

Basicily, optimizeForPointLookup would create a LRUcache with the size from your configuration (e.g. 268435456 bytes in your example), however, from Flink-1.10, we use managed memory for RocksDB by default, and it will use the size of managed memory to create an internal cache. You could also try to disable [state-backend-rocksdb-memory-managed|https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-rocksdb-memory-managed] to see whether your problem still existed.

Last but not least, would you please give a simple example with fake source to reproduce this problem so that we could figure it out.

> RocksDB optimizeForPointLookup results in missing time windows
> --------------------------------------------------------------
>
>                 Key: FLINK-17800
>                 URL: https://issues.apache.org/jira/browse/FLINK-17800
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.10.0, 1.10.1
>            Reporter: Yordan Pavlov
>            Priority: Major
>
> +My Setup:+
> We have been using the _RocksDb_ option of _optimizeForPointLookup_ and running version 1.7 for years. Upon upgrading to Flink 1.10 we started receiving a strange behavior of missing time windows on a streaming Flink job. For the purpose of testing I experimented with previous Flink version and (1.8, 1.9, 1.9.3) and non of them showed the problem
>  
> A sample of the code demonstrating the problem is here:
> {code:java}
>  val datastream = env
>  .addSource(KafkaSource.keyedElements(config.kafkaElements, List(config.kafkaBootstrapServer)))
>  val result = datastream
>  .keyBy( _ => 1)
>  .timeWindow(Time.milliseconds(1))
>  .print()
> {code}
>  
>  
> The source consists of 3 streams (being either 3 Kafka partitions or 3 Kafka topics), the elements in each of the streams are separately increasing. The elements generate increasing timestamps using an event time and start from 1, increasing by 1. The first partitions would consist of timestamps 1, 2, 10, 15..., the second of 4, 5, 6, 11..., the third of 3, 7, 8, 9...
>  
> +What I observe:+
> The time windows would open as I expect for the first 127 timestamps. Then there would be a huge gap with no opened windows, if the source has many elements, then next open window would be having a timestamp in the thousands. A gap of hundred of elements would be created with what appear to be 'lost' elements. Those elements are not reported as late (if tested with the ._sideOutputLateData_ operator). The way we have been using the option is by setting in inside the config like so:
> ??etherbi.rocksDB.columnOptions.optimizeForPointLookup=268435456??
> We have been using it for performance reasons as we have huge RocksDB state backend.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)