You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/27 04:05:29 UTC

[GitHub] [flink] lgo opened a new pull request #12345: [FLINK-17288] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

lgo opened a new pull request #12345:
URL: https://github.com/apache/flink/pull/12345


   ## WIP
   
   I was hoping to get a first round of feedback for this implementation. This branch is currently passing tests, but there was additional work to clean it up:
   
   - [ ] More testing of the new implementation.
   - [ ] Add benchmarking to RocksDB for the two implementations used here.
   - [ ] Add more configuration parameters to pass throguh to the writers.
   - [ ] Compare performance of save-point recovery on our production instanace, as it has a large state.
   
   ## What is the purpose of the change
   
   This adds a new mode of batch writing keys into RocksDB, via `RocksDBSSTIngestWriter`, which should provide a considerable performance improvement to some operations such as save-point recovery. This is in reference to the discussion on the users maillist I brought up, [here](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-savepoint-recovery-performance-improvements-td35238.html), as well as the previously reported ticket: https://issues.apache.org/jira/browse/FLINK-17288.
   
   The first commit is also replaces one use of RocksDB with a more efficient operation (`deleteRange`).
   
   ## Brief change log
   
   (commit 1)
   - Replaced an iterate and `delete` operation with `RocksDB#deleteRange`.
   
   (commit 2)
   - Refactored the use of `RocksDBWriteBatchWrapper` into using a factory (`RocksDBWriterFactory`) and interface (`RocksDBWriter`), in preperation of adding a second implementation.
   
   (commit 3)
   - Added `RockSDBSSTWriter`, which is a basic wrapper for `SstFileWriter` in order to create `sst` files. 
   - Added `RocksDBSSTIngestWriter`, which uses the `RockSDBSSTWriter`, and provides a write-interface for batch writing k/v into RocksDB. This includes flushing and handling multiple column-families.
   - Added new configuration for opting into the writer, as well as tuning parameters. This configuration was plumbed into `RocksDBWriterFactory`.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as:
   
   - RocksDB savepoint and checkpoint tests.
   
   This change added tests and can be verified as follows:
   
   - Added new tests for the `RocksDBSSTWriter` and `RocksDBSSTIngestWriter`.
   - [ ] **TODO** Add more rigourous tests for the new implementation.
   - [ ] **TODO** Extend existing tests to test both writer implementations.
   - [ ] **TODO** Manually verified the change running on a cluster.
   - [ ] **TODO** Write benchmarks for https://github.com/facebook/rocksdb to compare the two writing methods.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **yes**
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **yes**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **yes**
     - If yes, how is the feature documented? **TODO: not yet documented**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lgo edited a comment on pull request #12345: draft: [FLINK-17288] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
lgo edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634634096


   Oh sorry, I just realized what you meant. The ticket is quoted a bulk loading FAQ, and I assumed it was always about the SST ingestion loading. I'll create a new one!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Aitozi commented on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-1019084179


   Do we have some plan to drive this feature forward ? We have applied partly of this patch to improve the performance in our internal version, and thanks for your effort @lgo . I think the code is already in a good shape expect lacking of some benchmark of the testing. I'am glad to do a favor to complete this part, If need any help, please ping me  @lgo @Myasuka  :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c8444411fdf4ba5d22091742e8ec46d6ac943305 UNKNOWN
   * e2a126dd52baa4f41917d9c5490eb0698100f12b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293) 
   * a735cc4a65a33d011bed5211863b5e430575457d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lgo commented on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
lgo commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-775169629


   It passes all of the tests in `flink-statebackend-rocksdb` module. I have not done any other testing, like setitng up an app and experimenting.
   
   I added all the code behind a configuration option so both paths can continue to be executed. To test them both, I parameterized RocksDB tests to test both the WriteBatchWrapper (current) and SstFileWriter (new) ways of writing bulk `put` operations, and for a few tests duplicated them to trigger both paths.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on pull request #12345: [FLINK-17288] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
Myasuka commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634451901


   @lgo  Thanks for your contribution. As I suggested in mail threads, please create another JIRA ticket and link this PR to that one.
   BTW, I noticed this PR cannot pass the CI due to some new files lacking of license. Maybe you could tag this PR as "draft" before it could pass the CI and then reviewed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lgo commented on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
lgo commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-774844384


   Sorry about having dropped the progress on these changes @qinjunjerry @Myasuka (life got the better of me for quite a while). I did not end up getting a good production benchmark for the changes here.
   
   I recently had the time to rebase these changes to the latest master. I also carved out the change to use `deleteRange` into a separate PR (https://github.com/apache/flink/pull/14893) and an early benchmark for that.
   
   While I was doing a pass on that PR, I stumbled on an earlier ticket and discussion where @sihuazhou previously investigated using the external ingest API to speed these paths up. This is documented in [FLINK-8845](https://issues.apache.org/jira/browse/FLINK-8845).
   
   Summarizing the details from there, @sihuazhou's initial work and investigation found that the RocksDB's Java API for the SST writer had some key performance issues. Specifically, the interface was limited to `put(byte[] key, byte[] value)` and internally copied memory for constructing the RocksDB `DirectSlice`. This added a non-trivial overhead causing the Java SstFileWriter to have poor performance. As a result, they implemented the `RocksDBWriteBatchWrapper` for bulk writes rather than SST file ingestion.
   
   I found the RocksDB issue with a detailed write-up outlining this problem with `SstFileWriter` performance: https://github.com/facebook/rocksdb/issues/2668.
   
   Now, since then there was a PR, https://github.com/facebook/rocksdb/pull/2283, made to address this issue but it was hanging open from 2017. It had only just gotten merged in Feb 2020! This change was released as part of the Java API in RocksDB 6.8.0, see the [6.8.0](https://github.com/facebook/rocksdb/blob/master/HISTORY.md#680-02242020).
   
   Provided @sihuazhou's earlier investigation, I suspect this branch may not have much of an improvement without upgrading RocksDB to 6.8.0. Given the state of the RocksDB upgrade in https://issues.apache.org/jira/browse/FLINK-14482, I suspect it'll be quite some time (& work) before we get to there.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
Myasuka commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-774912525


   @lgo don't worry about the performance issue as we plan to upgrade RocksDB to at least 6.10 in Flink-1.13. We could ensure the SST generator works as expected first. Have the current commits could work well except possible performance issue?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917",
       "triggerID" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3921",
       "triggerID" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e8c32516657ef287f9defe7382c0a8fc2b38834a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e8c32516657ef287f9defe7382c0a8fc2b38834a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c989e01524236522de0ca5678bc8c219f8d87aa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3921) 
   * e8c32516657ef287f9defe7382c0a8fc2b38834a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917",
       "triggerID" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3921",
       "triggerID" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0c989e01524236522de0ca5678bc8c219f8d87aa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3921) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917",
       "triggerID" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3921",
       "triggerID" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c8444411fdf4ba5d22091742e8ec46d6ac943305 UNKNOWN
   * 0c989e01524236522de0ca5678bc8c219f8d87aa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3921) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917",
       "triggerID" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c8444411fdf4ba5d22091742e8ec46d6ac943305 UNKNOWN
   * a735cc4a65a33d011bed5211863b5e430575457d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lgo edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
lgo edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-774844384


   Sorry about having dropped the progress on these changes @qinjunjerry @Myasuka (life got the better of me for quite a while). I did not end up getting a good production benchmark for the changes here.
   
   I recently had the time to rebase these changes to the latest master. I also carved out the change to use `deleteRange` into a separate PR (https://github.com/apache/flink/pull/14893) and an early benchmark for that.
   
   While I was doing a pass on that PR, I stumbled on an earlier ticket and discussion where @sihuazhou previously investigated using the external ingest API to speed these paths up. This is documented in [FLINK-8845](https://issues.apache.org/jira/browse/FLINK-8845).
   
   Summarizing the details from there, @sihuazhou's initial work and investigation found that the RocksDB's Java API for the SST writer had some key performance issues. Specifically, the interface was limited to `put(byte[] key, byte[] value)` and internally copied memory for constructing the RocksDB `DirectSlice`. This added a non-trivial overhead causing the Java SstFileWriter to have poor performance. As a result, they implemented the `RocksDBWriteBatchWrapper` for bulk writes rather than SST file ingestion.
   
   I found the RocksDB issue with a detailed write-up outlining this problem with `SstFileWriter` performance: https://github.com/facebook/rocksdb/issues/2668.
   
   Now, since then there was a PR, https://github.com/facebook/rocksdb/pull/2283, made to address this issue but it was hanging open from 2017. It had only just gotten merged in Feb 2020! This change was released as part of the Java API in RocksDB 6.8.0, see the [6.8.0](https://github.com/facebook/rocksdb/blob/master/HISTORY.md#680-02242020).
   
   Provided @sihuazhou's earlier investigation, I suspect this branch may not have much of an improvement without upgrading RocksDB to 6.8.0. Given the state of the RocksDB upgrade in [FLINK-14482](https://issues.apache.org/jira/browse/FLINK-14482), I suspect it'll be quite some time (& work) before we get to there.
   
   I'll keep this branch open, given this should be an improvement with all of that, unless there is a preference to close it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Aitozi edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
Aitozi edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-1019084179






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c8444411fdf4ba5d22091742e8ec46d6ac943305 UNKNOWN
   * e2a126dd52baa4f41917d9c5490eb0698100f12b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lgo commented on a change in pull request #12345: [FLINK-17288] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
lgo commented on a change in pull request #12345:
URL: https://github.com/apache/flink/pull/12345#discussion_r430484842



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/writer/RocksDBSSTIngestWriter.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.writer;
+
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.IOUtils;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link RocksDBSSTIngestWriter} implements {@link RocksDBWriter}, providing writes by creating sst
+ * files and instructing {@link RocksDB} to ingest them (via {@link
+ * RocksDB#ingestExternalFile(ColumnFamilyHandle, List, IngestExternalFileOptions)}).
+ *
+ * <p>{@link RocksDBSSTIngestWriter} supports writing to multiple {@link ColumnFamilyHandle},
+ * assuming the writes within a {@link ColumnFamilyHandle} are ordered.
+ *
+ * <p>It uses {@link RocksDBSSTWriter} for creating the sst file of a {@link ColumnFamilyHandle}.
+ *
+ * <p>IMPORTANT: This class is not thread safe.
+ *
+ * <p>@lgo: describe in detail what this writer is good for. (only Puts, no more atomicity than the
+ * other) => note: this writer does not have completely atomic operations. If a failure occurs
+ * between ingested files (being flushed), the RocksDB state will be left with partial writes. It is
+ * unclear if this is a problem for Flink, but it should be tested.
+ */
+public class RocksDBSSTIngestWriter implements RocksDBWriter {
+
+	/**
+	 * {@code INGEST_EXTERNAL_FILE_OPTIONS} are the {@link IngestExternalFileOptions} provided to
+	 * RocksDB when calling {@link RocksDB#ingestExternalFile(ColumnFamilyHandle, List,
+	 * IngestExternalFileOptions)}. We only use one set of options, so we use a singleton that is not
+	 * closed.
+	 */
+	// @lgo: because flink is using the JNI 5.14, the API for IngestExternalFileOptions is pretty
+	// bad and the builder-pattern does not exist.
+	private static final IngestExternalFileOptions INGEST_EXTERNAL_FILE_OPTIONS =
+		new IngestExternalFileOptions(
+			// Move files rather than copying them. Because we are generating
+			// one-off files for import, it does not matter if the are moved and
+			// import fails.
+			//
+			// @lgo: ensure this does not cause stray RocksDB sst files on a host when ingestion
+			// fails.
+			/* moveFiles */ true,
+
+			// Set to the default (true).
+			/* snapshotConsistency */ true,
+
+			// Set to the default (true).
+			/* allowGlobalSeqNo */ true,
+
+			// By disallowing a blocking flush, ingestion loudly fails when it
+			// has keys that overlap the RocksDB memtable.
+			/* allowBlockingFlush */ false);
+
+	/** {@code maxSstSize} is the maximum size of an sst file before flushing it. */
+	private final int maxSstSize;
+
+	/**
+	 * {@code envOptions} are the {@link EnvOptions} provided to the underlying {@link
+	 * RocksDBSSTWriter}.
+	 */
+	// @lgo: plumb through the options into the constructor
+	// and set sane options.
+	private final EnvOptions envOptions;
+
+	/**
+	 * {@code options} are the {@link Options} provided to the underlying {@link RocksDBSSTWriter}.
+	 */
+	// @lgo: plumb through the options into the constructor
+	// and set sane options.
+	private final Options options;
+
+	/** {@code db} is the active RocksDB database to write files to. */
+	private final RocksDB db;
+
+	/**
+	 * {@code sstFileWriters} contains the currently open {@link RocksDBSSTWriter} for each {@link
+	 * ColumnFamilyHandle}.
+	 */
+	private final HashMap<Integer, RocksDBSSTWriter> columnFamilyWriters = new HashMap<>();
+
+	/**
+	 * {@code ingestionTempDir} is the directory used to write temporary sst files before ingesting
+	 * them into {@link RocksDB}.
+	 */
+	private final File ingestionTempDir;
+
+	public RocksDBSSTIngestWriter(
+		@Nonnull RocksDB rocksDB,
+		@Nonnegative int maxSstSize,
+		@Nullable EnvOptions envOptions,
+		@Nullable Options options,
+		@Nullable File tempDir)
+		throws IOException {
+		this.db = rocksDB;
+		this.maxSstSize = maxSstSize;
+		this.envOptions = envOptions;
+		this.options = options;
+
+		// Set up a temporary directory for writing generated SST files.
+		// Either, use the provided temporary directory (such as during tests), or create a new one.
+		if (tempDir != null) {
+			this.ingestionTempDir = tempDir;
+		} else {
+			// @lgo: clean the temporary folder up. This may not actually be bad considering
+			// ingestion will move the temporary sst files out.
+			this.ingestionTempDir = Files.createTempDirectory("rocksdb-sst-writer-temp-").toFile();
+		}
+		maxSstSize = 0;
+	}
+
+	public void put(
+		@Nonnull ColumnFamilyHandle columnFamilyHandle, @Nonnull byte[] key, @Nonnull byte[] value)
+		throws RocksDBException, IOException {
+		// Get the sst writer for the column family.
+		RocksDBSSTWriter writer = ensureSSTableWriter(columnFamilyHandle);
+		// Insert the k/v.
+		writer.put(key, value);
+		// Flush the sst and ingest it, if it needs to be flushed.
+		flushIfNeeded(columnFamilyHandle, writer);
+	}
+
+	private RocksDBSSTWriter ensureSSTableWriter(@Nonnull ColumnFamilyHandle columnFamilyHandle)
+		throws RocksDBException, IOException {
+		// Return an existing writer if there is one, Otherwise prepare a new one.
+		if (columnFamilyWriters.containsKey(columnFamilyHandle.getID())) {
+			return columnFamilyWriters.get(columnFamilyHandle.getID());
+		}
+
+		// Create a new sst file in the temporary folder.
+		final String sstFileName = "ingest_" + new AbstractID() + ".sst";
+		File sstFile = new File(ingestionTempDir, sstFileName);
+
+		// Initialize the sst writer.
+		RocksDBSSTWriter writer =
+			new RocksDBSSTWriter(envOptions, options, columnFamilyHandle, sstFile);
+
+		// Store the new writer for the column family.
+		columnFamilyWriters.put(columnFamilyHandle.getID(), writer);
+
+		return writer;
+	}
+
+	/**
+	 * Flushes the data for a particular {@link RocksDBSSTWriter} into the {@link RocksDB} instance.
+	 * After flushing, the {@link ColumnFamilyHandle} will not have an active {@link
+	 * RocksDBSSTWriter}, and it will need to be initialized by {@link
+	 * #ensureSSTableWriter(ColumnFamilyHandle)}.
+	 *
+	 * @throws RocksDBException
+	 */
+	private void flushAndCloseWriter(@Nonnull RocksDBSSTWriter writer) throws RocksDBException {
+		// Finish the sst writer.
+		writer.finish();
+
+		// Instruct RocksDB to ingest the sst files. Because all of the ingested files are
+		// for different column families and the JNI SstFileWriter does not allow setting the
+		// ColumnFamilyHeader when writing the file, need to call ingestExternalFile for one
+		// sst file at a time.
+		//
+		// Because the IngestExternalFileOptions specifies to move the sst file, we do not need
+		// to clean up the written file.
+		List<String> files = Collections.singletonList(writer.getFile().getAbsolutePath());
+		db.ingestExternalFile(writer.getColumnFamilyHandle(), files, INGEST_EXTERNAL_FILE_OPTIONS);

Review comment:
       TODO: I believe this needs `ColumnFamilyOptions` to be passed through. If not here, somewhere else on this write-path.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/writer/RocksDBSSTIngestWriter.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.writer;
+
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.IOUtils;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link RocksDBSSTIngestWriter} implements {@link RocksDBWriter}, providing writes by creating sst
+ * files and instructing {@link RocksDB} to ingest them (via {@link
+ * RocksDB#ingestExternalFile(ColumnFamilyHandle, List, IngestExternalFileOptions)}).
+ *
+ * <p>{@link RocksDBSSTIngestWriter} supports writing to multiple {@link ColumnFamilyHandle},
+ * assuming the writes within a {@link ColumnFamilyHandle} are ordered.
+ *
+ * <p>It uses {@link RocksDBSSTWriter} for creating the sst file of a {@link ColumnFamilyHandle}.
+ *
+ * <p>IMPORTANT: This class is not thread safe.
+ *
+ * <p>@lgo: describe in detail what this writer is good for. (only Puts, no more atomicity than the
+ * other) => note: this writer does not have completely atomic operations. If a failure occurs
+ * between ingested files (being flushed), the RocksDB state will be left with partial writes. It is
+ * unclear if this is a problem for Flink, but it should be tested.
+ */
+public class RocksDBSSTIngestWriter implements RocksDBWriter {
+
+	/**
+	 * {@code INGEST_EXTERNAL_FILE_OPTIONS} are the {@link IngestExternalFileOptions} provided to
+	 * RocksDB when calling {@link RocksDB#ingestExternalFile(ColumnFamilyHandle, List,
+	 * IngestExternalFileOptions)}. We only use one set of options, so we use a singleton that is not
+	 * closed.
+	 */
+	// @lgo: because flink is using the JNI 5.14, the API for IngestExternalFileOptions is pretty
+	// bad and the builder-pattern does not exist.
+	private static final IngestExternalFileOptions INGEST_EXTERNAL_FILE_OPTIONS =
+		new IngestExternalFileOptions(
+			// Move files rather than copying them. Because we are generating
+			// one-off files for import, it does not matter if the are moved and
+			// import fails.
+			//
+			// @lgo: ensure this does not cause stray RocksDB sst files on a host when ingestion
+			// fails.
+			/* moveFiles */ true,
+
+			// Set to the default (true).
+			/* snapshotConsistency */ true,
+
+			// Set to the default (true).
+			/* allowGlobalSeqNo */ true,
+
+			// By disallowing a blocking flush, ingestion loudly fails when it
+			// has keys that overlap the RocksDB memtable.
+			/* allowBlockingFlush */ false);
+
+	/** {@code maxSstSize} is the maximum size of an sst file before flushing it. */
+	private final int maxSstSize;
+
+	/**
+	 * {@code envOptions} are the {@link EnvOptions} provided to the underlying {@link
+	 * RocksDBSSTWriter}.
+	 */
+	// @lgo: plumb through the options into the constructor
+	// and set sane options.
+	private final EnvOptions envOptions;
+
+	/**
+	 * {@code options} are the {@link Options} provided to the underlying {@link RocksDBSSTWriter}.
+	 */
+	// @lgo: plumb through the options into the constructor
+	// and set sane options.
+	private final Options options;
+
+	/** {@code db} is the active RocksDB database to write files to. */
+	private final RocksDB db;
+
+	/**
+	 * {@code sstFileWriters} contains the currently open {@link RocksDBSSTWriter} for each {@link
+	 * ColumnFamilyHandle}.
+	 */
+	private final HashMap<Integer, RocksDBSSTWriter> columnFamilyWriters = new HashMap<>();
+
+	/**
+	 * {@code ingestionTempDir} is the directory used to write temporary sst files before ingesting
+	 * them into {@link RocksDB}.
+	 */
+	private final File ingestionTempDir;
+
+	public RocksDBSSTIngestWriter(
+		@Nonnull RocksDB rocksDB,
+		@Nonnegative int maxSstSize,
+		@Nullable EnvOptions envOptions,
+		@Nullable Options options,
+		@Nullable File tempDir)
+		throws IOException {
+		this.db = rocksDB;
+		this.maxSstSize = maxSstSize;
+		this.envOptions = envOptions;
+		this.options = options;
+
+		// Set up a temporary directory for writing generated SST files.
+		// Either, use the provided temporary directory (such as during tests), or create a new one.
+		if (tempDir != null) {
+			this.ingestionTempDir = tempDir;
+		} else {
+			// @lgo: clean the temporary folder up. This may not actually be bad considering
+			// ingestion will move the temporary sst files out.
+			this.ingestionTempDir = Files.createTempDirectory("rocksdb-sst-writer-temp-").toFile();
+		}
+		maxSstSize = 0;
+	}
+
+	public void put(
+		@Nonnull ColumnFamilyHandle columnFamilyHandle, @Nonnull byte[] key, @Nonnull byte[] value)
+		throws RocksDBException, IOException {
+		// Get the sst writer for the column family.
+		RocksDBSSTWriter writer = ensureSSTableWriter(columnFamilyHandle);
+		// Insert the k/v.
+		writer.put(key, value);
+		// Flush the sst and ingest it, if it needs to be flushed.
+		flushIfNeeded(columnFamilyHandle, writer);
+	}
+
+	private RocksDBSSTWriter ensureSSTableWriter(@Nonnull ColumnFamilyHandle columnFamilyHandle)
+		throws RocksDBException, IOException {
+		// Return an existing writer if there is one, Otherwise prepare a new one.
+		if (columnFamilyWriters.containsKey(columnFamilyHandle.getID())) {
+			return columnFamilyWriters.get(columnFamilyHandle.getID());
+		}
+
+		// Create a new sst file in the temporary folder.
+		final String sstFileName = "ingest_" + new AbstractID() + ".sst";
+		File sstFile = new File(ingestionTempDir, sstFileName);
+
+		// Initialize the sst writer.
+		RocksDBSSTWriter writer =
+			new RocksDBSSTWriter(envOptions, options, columnFamilyHandle, sstFile);
+
+		// Store the new writer for the column family.
+		columnFamilyWriters.put(columnFamilyHandle.getID(), writer);
+
+		return writer;
+	}
+
+	/**
+	 * Flushes the data for a particular {@link RocksDBSSTWriter} into the {@link RocksDB} instance.
+	 * After flushing, the {@link ColumnFamilyHandle} will not have an active {@link
+	 * RocksDBSSTWriter}, and it will need to be initialized by {@link
+	 * #ensureSSTableWriter(ColumnFamilyHandle)}.
+	 *
+	 * @throws RocksDBException
+	 */
+	private void flushAndCloseWriter(@Nonnull RocksDBSSTWriter writer) throws RocksDBException {
+		// Finish the sst writer.
+		writer.finish();
+
+		// Instruct RocksDB to ingest the sst files. Because all of the ingested files are
+		// for different column families and the JNI SstFileWriter does not allow setting the
+		// ColumnFamilyHeader when writing the file, need to call ingestExternalFile for one
+		// sst file at a time.
+		//
+		// Because the IngestExternalFileOptions specifies to move the sst file, we do not need
+		// to clean up the written file.
+		List<String> files = Collections.singletonList(writer.getFile().getAbsolutePath());
+		db.ingestExternalFile(writer.getColumnFamilyHandle(), files, INGEST_EXTERNAL_FILE_OPTIONS);

Review comment:
       TODO: I believe this needs `ColumnFamilyOptions` to be passed through. If not here, somewhere else on this write-path (such as via `Options`).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917",
       "triggerID" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c8444411fdf4ba5d22091742e8ec46d6ac943305 UNKNOWN
   * e2a126dd52baa4f41917d9c5490eb0698100f12b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293) 
   * a735cc4a65a33d011bed5211863b5e430575457d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c8444411fdf4ba5d22091742e8ec46d6ac943305 UNKNOWN
   * ece18a51ce550ce1f42d09597392584872a42e3c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218) 
   * e2a126dd52baa4f41917d9c5490eb0698100f12b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] qinjunjerry commented on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
qinjunjerry commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-727624499


   @lgo Have you evaluated the performance improvement of the change?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Aitozi edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
Aitozi edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-1019084179


   Do we have some plan to drive this feature forward ? I have applied partly of this patch to improve the performance in our internal version, and thanks for your effort @lgo . I think the code is already in a good shape except lacking of some benchmark testing. I'am glad to do a favor to complete this part, If need any help, please ping me  @lgo @Myasuka  :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lgo commented on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
lgo commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-678368305


   Unfortunately not. The changes should be code complete, but I did not find the time to test and evaluate perf on running cluster yet and had backported to 1.9.1 for that.
   
   Thanks for the reminder on this though, I'm going to try to do that next week when I've got a bunch of free time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
Myasuka commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-971262295


   @lgo Do you have plans to update this as we already upgraded to RocksDB-6.20.3?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lgo edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
lgo edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-774844384


   Sorry about having dropped the progress on these changes @qinjunjerry @Myasuka (life got the better of me for quite a while). I did not end up getting a good production benchmark for the changes here.
   
   I recently had the time to rebase these changes to the latest master. I also carved out the change to use `deleteRange` into a separate PR (https://github.com/apache/flink/pull/14893) and an early benchmark for that.
   
   While I was doing a pass on that PR, I stumbled on an earlier ticket and discussion where @sihuazhou previously investigated using the external ingest API to speed these paths up. This is documented in [FLINK-8845](https://issues.apache.org/jira/browse/FLINK-8845).
   
   Summarizing the details from there, @sihuazhou's initial work and investigation found that the RocksDB's Java API for the SST writer had some key performance issues. Specifically, the interface was limited to `put(byte[] key, byte[] value)` and internally copied memory for constructing the RocksDB `DirectSlice`. This added a non-trivial overhead causing the Java SstFileWriter to have poor performance. As a result, they implemented the `RocksDBWriteBatchWrapper` for bulk writes rather than SST file ingestion.
   
   I found the RocksDB issue with a detailed write-up outlining this problem with `SstFileWriter` performance: https://github.com/facebook/rocksdb/issues/2668.
   
   Now, since then there was a PR, https://github.com/facebook/rocksdb/pull/2283, made to address this issue but it was hanging open from 2017. It had only just gotten merged in Feb 2020! This change was released as part of the Java API in RocksDB 6.8.0, see the [6.8.0](https://github.com/facebook/rocksdb/blob/master/HISTORY.md#680-02242020).
   
   Provided @sihuazhou's earlier investigation, I suspect this branch may not have much of an improvement without upgrading RocksDB to 6.8.0. Given the state of the RocksDB upgrade in [FLINK-14482](https://issues.apache.org/jira/browse/FLINK-14482), I suspect it'll be quite some time (& work) before we get to there.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12345: [FLINK-17288] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634023655






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917",
       "triggerID" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c8444411fdf4ba5d22091742e8ec46d6ac943305 UNKNOWN
   * a735cc4a65a33d011bed5211863b5e430575457d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917) 
   * 0c989e01524236522de0ca5678bc8c219f8d87aa UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917",
       "triggerID" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3921",
       "triggerID" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e8c32516657ef287f9defe7382c0a8fc2b38834a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13077",
       "triggerID" : "e8c32516657ef287f9defe7382c0a8fc2b38834a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e8c32516657ef287f9defe7382c0a8fc2b38834a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13077) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
Myasuka commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-678116971


   @lgo Have you ever completed the PR yet?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c8444411fdf4ba5d22091742e8ec46d6ac943305",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c8444411fdf4ba5d22091742e8ec46d6ac943305 UNKNOWN
   * ece18a51ce550ce1f42d09597392584872a42e3c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218) 
   * e2a126dd52baa4f41917d9c5490eb0698100f12b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lgo commented on pull request #12345: draft: [FLINK-17288] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
lgo commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634634096


   Oh sorry, I just realized what you meant. The ticket is quoted a bulk loading FAQ, and I assumedi t was always about the SST ingestion loading. I'll create a new one!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lgo commented on pull request #12345: [FLINK-17288] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
lgo commented on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634628869


   @Myasuka actually this is doing exactly what the bulk loading ticket is describing. The `RocksDBSSTIngestWriter` is a drop-in repacement for `RocksDBWriteBatchWrapper` where only `RocksDB#put` is being used, so this is implementing bulk loading.
   
   Right now, the PR has 2 locations changed to use it in full savepoint recovery and state migrations (`migrateStateValues`). This happens via `RocksDBWriteFactory#defaultPutWriter`, which will provide the new `RocksDBSSTIngestWriter` if it has been configured.
   
   Sorry, I'll update the description to be a bit more descriptive of this. This is definitely a draft, so I'll go ahead and add that tag (:


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: [FLINK-17288] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12345: draft: [FLINK-17971] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12345:
URL: https://github.com/apache/flink/pull/12345#issuecomment-634034354


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2218",
       "triggerID" : "ece18a51ce550ce1f42d09597392584872a42e3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2293",
       "triggerID" : "e2a126dd52baa4f41917d9c5490eb0698100f12b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3917",
       "triggerID" : "a735cc4a65a33d011bed5211863b5e430575457d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3921",
       "triggerID" : "0c989e01524236522de0ca5678bc8c219f8d87aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e8c32516657ef287f9defe7382c0a8fc2b38834a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13077",
       "triggerID" : "e8c32516657ef287f9defe7382c0a8fc2b38834a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e8c32516657ef287f9defe7382c0a8fc2b38834a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13077) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org