You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/05/09 16:24:57 UTC
[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...
GitHub user sihuazhou opened a pull request:
https://github.com/apache/flink/pull/5979
[FLINK-9070][state]improve the performance of RocksDBMapState.clear()
## What is the purpose of the change
This PR intend to improve the performance of `RocksDBMapState.clear()` base on the follow things:
- Using RocksIterator to iterate the records directly (currently we use the RocksDBMapIterator, witch will buffer the records and may also need to perform seeking multi times.)
- Using WriteBatch to perform deleting in bulk.
## Brief change log
- *improve the `RocksDBMapState.clear()` via iterating the records directly and deleting records using WriteBatch.*
## Verifying this change
This change is already covered by existing tests,.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
no
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sihuazhou/flink improveMapStateClear
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5979.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5979
----
commit 969036a32b49b32a4168afb747dff54fe9ada6e6
Author: sihuazhou <su...@...>
Date: 2018-04-07T13:52:05Z
improve the RocksDBMapState.clear() via iterate the records directly and delete records using WriteBatch.
----
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5979
@StephanEwen , I had a micro-benchmark, here is the result
```
---------> Batch VS Put <------------
BATCH: end insert - duration:255
PUT: end insert - duration:545
---------> MapState#Clear New VS Old <------------
---->
NEW: end delete 50 records - duration:1077719
OLD: end delete 50 records - duration:10949887
---->
NEW: end delete 100 records - duration:809182
OLD: end delete 100 records - duration:1617317
---->
NEW: end delete 200 records - duration:1970156
OLD: end delete 200 records - duration:2731749
---->
NEW: end delete 400 records - duration:2492822
OLD: end delete 400 records - duration:13894767
---->
NEW: end delete 800 records - duration:5816919
OLD: end delete 800 records - duration:13017591
```
I tested two things:
- To compare the performance between `WriteBatch()` VS `Put()`
- To compare the performance of the `RocsDBMapState#Clear()`: new version vs old version
In general, the more records there, the more lift we would get from the new version, here is my code for the test: https://github.com/sihuazhou/flink/commit/75504ad6fdb33755cccef43935e007bd5804ea9d
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5979
@sihuazhou I wonder why you would chose iterator + batched write over simply calling `db.deleteRange(...)` where start key is `serializeCurrentKeyAndNamespace()` and end key is increasing the last byte of the start key by one. That seems like an even better idea to me, what do you think?
---
[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5979
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5979
Hmm...there is another reason, indeed the mainly performance overhead is the `seek()`. Even though we use the `deleteRange()` to implement this, we also need to get the last key of the entries which means we also need to iterate all the entries. So, the `deleteRange()` approach seems won't make a obvious lift then this approach.
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:
https://github.com/apache/flink/pull/5979
LGTM +1
---
[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5979#discussion_r187278034
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---
@@ -356,6 +369,20 @@ private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSe
return isNull ? null : valueSerializer.deserialize(in);
}
+ private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
+ if (rawKeyBytes.length < keyPrefixBytes.length) {
+ return false;
+ }
+
+ for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
--- End diff --
Ah, this is a function that simply polled out from the `RocksDBMapState#RocksDBMapIterator`, in`RocksDBMapState#RocksDBMapIterator` it's name was `underSameKey()` , I didn't change any code related to it's implementation.
But concern this loop, yes, I was the person that written this loop for this method(`underSameKey()`), and your suggestion was the first version that I implemented it, but during the reviewing by @StefanRRichter , he suggest that the current style, and I feel that looks more simpler, so I made it into the current shape finally, personally I would't against the current version.Ah, this is a function that simply polled out from the `RocksDBMapState#RocksDBMapIterator`, in`RocksDBMapState#RocksDBMapIterator` it's name was `underSameKey()` , I didn't change any code related to it's implementation.
But concern this loop, yes, I was the person that written this loop for this method(`underSameKey()`), and your suggestion was the first version that I implemented it, but during the reviewing by @StefanRRichter , he suggest that the current style, and I feel that looks more simpler, so I made it into the current shape finally, personally I would't against the current version.
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5979
Thank you all @StefanRRichter @StephanEwen @bowenli86
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5979
Hi @StefanRRichter If I'm not sure whether we can do that without `seek()`, because the `key bytes` is length is not fixed which may lead to delete wrongly, What do you think?
Sure, rebasing...
---
[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...
Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5979#discussion_r187255609
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---
@@ -356,6 +369,20 @@ private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSe
return isNull ? null : valueSerializer.deserialize(in);
}
+ private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
+ if (rawKeyBytes.length < keyPrefixBytes.length) {
+ return false;
+ }
+
+ for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
--- End diff --
I recommend moving `--i` to the increment part of the `for` loop, instead of keeping it in the termination part
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5979
@StefanRRichter , the reason I prefer this approach is that:
- From the comment in RocksDB's source we can find that deleteRange() should be used for deleting big range, what if the entries num of the map is not that big.
- From the comments we can also find that deleteRange() would hurt the read performance, so we should consider to set ReadOptions::ignore_range_deletions = true to avoid the negative effect by deleteRange(), but if we use it for MapState.clear(), it seems that we can't set ReadOptions::ignore_range_deletions = true.
And current approach should not bring any downside, what do you think?
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5979
Could you share some micro-benchmark numbers?
When we change something that we know works well to something new, would be good to understand what benefits we are talking about.
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5979
With the approach I outlined, we would not require any `seek()` to the last key, we can simply create the exclusive end key. Nevertheless, you are right about the comment that is only in the C++ code but not in the Java API. So I am fine with the current implementation.
Could you please rebase on the latest master so that your code can use `RocksIteratorWrapper`?
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5979
cc @StefanRRichter (This is for 1.6, I just complete it when I have time currently)
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5979
LGTM, will merge.
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5979
Hi @StefanRRichter I rebased the PR, could you please have a look?
---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5979
Okay, looks really good from my side.
Would be good if @StefanRRichter or @azagrebin to double check the change, otherwise good to go.
---