You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yun Tang <my...@live.com> on 2020/10/02 03:51:44 UTC

Re: Poor performance with large keys using RocksDB and MapState

Hi

The option of 'setCacheIndexAndFilterBlocks' is used to ensure we could manage the memory usage of RocksDB, could you share logs or more descriptions why setCacheIndexAndFilterBlocks seems to make the hash index not work properly?

I guess this might due to the index and filter is more likely to be pop out with the competition of data blocks [1], although Flink has tried its best to minimize the regression. Please consider to increase the total block cache size or decrease state.backend.rocksdb.memory.write-buffer-ratio [2]

[1] https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#memory-management

Best
Yun Tang
________________________________
From: ���� ��� <ya...@gmail.com>
Sent: Tuesday, September 29, 2020 17:49
To: Yun Tang <my...@live.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Poor performance with large keys using RocksDB and MapState

Thanks Yun!,
I used this option, and it greatly helped

2:44<https://xmcyber.slack.com/archives/DP2KLMWUX/p1600947862026600>

val be = new RocksDBStateBackend("file:///tmp")class MyConfig extends DefaultConfigurableOptionsFactory {  override def createColumnOptions(currentOptions: ColumnFamilyOptions, handlesToClose: util.Collection[AutoCloseable]): ColumnFamilyOptions = {
    super.createColumnOptions(currentOptions, handlesToClose).optimizeForPointLookup(2000)
  }
}
be.setRocksDBOptions(new MyConfig)
be.getMemoryConfiguration.setUseManagedMemory(false)

But now I cant use the RocksDBSharedResources because of setCacheIndexAndFilterBlocks seems to make the hash index not work properly and the performance is bad again.
Only when using  be.getMemoryConfiguration.setUseManagedMemory(false) and skipping setCacheIndexAndFilterBlocks , only then its working :(





On Fri, Sep 25, 2020 at 9:56 AM Yun Tang <my...@live.com>> wrote:
Hi

If you want to improve the performance of point lookup, you could try to use additional hash index. This feature needs to pass a prefix extractor, however, original interface is not exposed out directly in java API.

You could try to call columnFamilyOptions.optimizeForPointLookup(blockCacheSizeMb) and it would use NoopTransform prefix extractor by default[1].
Please also consider to use this feature after Flink-1.10.2 due to RocksDB internal bug [2].

[1] https://github.com/dataArtisans/frocksdb/blob/c724d41fab7f9f09f9676dfccc6d210a191da4d6/options/options.cc#L477
[2] https://issues.apache.org/jira/browse/FLINK-17800

Best
Yun Tang


________________________________
From: ���� ��� <ya...@gmail.com>>
Sent: Wednesday, September 23, 2020 23:56
To: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Poor performance with large keys using RocksDB and MapState

Hello,
I have a poor throughput issue, and I think I managed to reproduce it using the following code:

val conf = new Configuration()
conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 * 1000))
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 * 1000))
conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))

val be = new RocksDBStateBackend("file:///tmp")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
  .setStateBackend(be)


env.setParallelism(3)
env.getConfig.enableObjectReuse()

val r = new scala.util.Random(31)
    val randStr = r.nextString(4992)
    val s = env.fromElements(1).process((value: Int, ctx: _root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => {
      for (a <- 1 to 1000 * 1000 * 10) {
        out.collect( randStr + r.nextString(8) )

      }
    }).keyBy(a=>a).process(new ProcessFunction[String, String] {
      private var someState: MapState[String, String] = _

      override def open(parameters: Configuration): Unit = {
        someState = getRuntimeContext.getMapState(
          new MapStateDescriptor[String, String]("someState", createTypeInformation[String], createTypeInformation[String])
        )
      }

      override def processElement(value: _root_.scala.Predef.String, ctx: _root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
        if(!someState.contains(value)) {
          someState.put(value, value)
        }
      }
})

env.execute()

This has really poor throughput.
Now changing
out.collect( randStr + r.nextString(8) )

to
out.collect( r.nextString(8) + randStr)
Solves the issue.
Is there any way easy to fix this?
I tried to use hash index, but it required rocks db option called "prefix extractor" which I don't know how to fill yet, and no idea if it will fix it.
If anyone encountered that before, I would really use some advice/help. Thanks!