You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/05/09 16:24:57 UTC

[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...

GitHub user sihuazhou opened a pull request:

    https://github.com/apache/flink/pull/5979

    [FLINK-9070][state]improve the performance of RocksDBMapState.clear() 

    ## What is the purpose of the change
    
    This PR intend to improve the performance of `RocksDBMapState.clear()` base on the follow things:
    
    - Using RocksIterator to iterate the records directly (currently we use the RocksDBMapIterator, witch will buffer the records and may also need to perform seeking multi times.)
    - Using WriteBatch to perform deleting in bulk.
    
    
    ## Brief change log
    
      - *improve the `RocksDBMapState.clear()` via iterating the records directly and deleting records using WriteBatch.*
    
    ## Verifying this change
    
    This change is already covered by existing tests,.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
    no

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sihuazhou/flink improveMapStateClear

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5979.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5979
    
----
commit 969036a32b49b32a4168afb747dff54fe9ada6e6
Author: sihuazhou <su...@...>
Date:   2018-04-07T13:52:05Z

    improve the RocksDBMapState.clear() via iterate the records directly and delete records using WriteBatch.

----


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    @StephanEwen , I had a micro-benchmark, here is the result
    ```
    ---------> Batch VS Put <------------
    BATCH: end insert - duration:255
    PUT: end insert - duration:545
    
    ---------> MapState#Clear New VS Old <------------
    ---->
    NEW: end delete 50 records - duration:1077719
    OLD: end delete 50 records - duration:10949887
    ---->
    NEW: end delete 100 records - duration:809182
    OLD: end delete 100 records - duration:1617317
    ---->
    NEW: end delete 200 records - duration:1970156
    OLD: end delete 200 records - duration:2731749
    ---->
    NEW: end delete 400 records - duration:2492822
    OLD: end delete 400 records - duration:13894767
    ---->
    NEW: end delete 800 records - duration:5816919
    OLD: end delete 800 records - duration:13017591
    ```
    
    I tested two things:
    - To compare the performance between `WriteBatch()` VS `Put()`
    - To compare the performance of the `RocsDBMapState#Clear()`: new version vs old version
    
    In general, the more records there, the more lift we would get from the new version, here is my code for the test:  https://github.com/sihuazhou/flink/commit/75504ad6fdb33755cccef43935e007bd5804ea9d


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    @sihuazhou I wonder why you would chose iterator + batched write over simply calling `db.deleteRange(...)` where start key is `serializeCurrentKeyAndNamespace()` and end key is increasing the last byte of the start key by one. That seems like an even better idea to me, what do you think?


---

[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5979


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    Hmm...there is another reason, indeed the mainly performance overhead is the `seek()`. Even though we use the `deleteRange()` to implement this, we also need to get the last key of the entries which means we also need to iterate all the entries. So, the `deleteRange()` approach seems won't make a obvious lift then this approach.


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    LGTM +1


---

[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5979#discussion_r187278034
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---
    @@ -356,6 +369,20 @@ private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSe
     		return isNull ? null : valueSerializer.deserialize(in);
     	}
     
    +	private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
    +		if (rawKeyBytes.length < keyPrefixBytes.length) {
    +			return false;
    +		}
    +
    +		for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
    --- End diff --
    
    Ah, this is a function that simply polled out from the `RocksDBMapState#RocksDBMapIterator`, in`RocksDBMapState#RocksDBMapIterator` it's name was `underSameKey()` , I didn't change any code related to it's implementation. 
    
    But concern this loop, yes, I was the person that written this loop for this method(`underSameKey()`), and your suggestion was the first version that I implemented it, but during the reviewing by @StefanRRichter , he suggest that the current style, and I feel that looks more simpler, so I made it into the current shape finally, personally I would't against the current version.Ah, this is a function that simply polled out from the `RocksDBMapState#RocksDBMapIterator`, in`RocksDBMapState#RocksDBMapIterator` it's name was `underSameKey()` , I didn't change any code related to it's implementation. 
    
    But concern this loop, yes, I was the person that written this loop for this method(`underSameKey()`), and your suggestion was the first version that I implemented it, but during the reviewing by @StefanRRichter , he suggest that the current style, and I feel that looks more simpler, so I made it into the current shape finally, personally I would't against the current version.


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    Thank you all @StefanRRichter @StephanEwen @bowenli86 


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    Hi @StefanRRichter If I'm not sure whether we can do that without `seek()`, because the `key bytes` is length is not fixed which may lead to delete wrongly, What do you think?
    
    Sure, rebasing...


---

[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5979#discussion_r187255609
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---
    @@ -356,6 +369,20 @@ private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSe
     		return isNull ? null : valueSerializer.deserialize(in);
     	}
     
    +	private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
    +		if (rawKeyBytes.length < keyPrefixBytes.length) {
    +			return false;
    +		}
    +
    +		for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
    --- End diff --
    
    I recommend moving `--i` to the increment part of the `for` loop, instead of keeping it in the termination part


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    @StefanRRichter , the reason I prefer this approach is that:
    
    - From the comment in RocksDB's source we can find that deleteRange() should be used for deleting big range, what if the entries num of the map is not that big.
    
    - From the comments we can also find that deleteRange() would hurt the read performance, so we should consider to set ReadOptions::ignore_range_deletions = true to avoid the negative effect by deleteRange(), but if we use it for MapState.clear(), it seems that we can't set ReadOptions::ignore_range_deletions = true.
    
    And current approach should not bring any downside, what do you think?


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    Could you share some micro-benchmark numbers?
    When we change something that we know works well to something new, would be good to understand what benefits we are talking about.


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    With the approach I outlined, we would not require any `seek()` to the last key, we can simply create  the exclusive end key. Nevertheless, you are right about the comment that is only in the C++ code but not in the Java API. So I am fine with the current implementation.
    
    Could you please rebase on the latest master so that your code can use `RocksIteratorWrapper`?


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    cc @StefanRRichter (This is for 1.6, I just complete it when I have time currently)


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    LGTM, will merge.


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    Hi @StefanRRichter I rebased the PR, could you please have a look?


---

[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5979
  
    Okay, looks really good from my side.
    
    Would be good if @StefanRRichter or @azagrebin to double check the change, otherwise good to go.


---