You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/27 10:28:08 UTC

[GitHub] [beam] mxm opened a new pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

mxm opened a new pull request #11533:
URL: https://github.com/apache/beam/pull/11533


   For computing the current output watermark we need to compute the minimum
   watermark hold across all watermark holds. We maintain an in-memory view of all
   watermark holds because iterating over all keys is an expensive operation which
   we only want to do when restoring from a checkpoint, not on every watermark.
   
   The watermark view may not be in sync with its backing managed state. This
   change corrects this by maintaining a priority queue with all watermark holds
   and adding additional tests.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-620017442






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-620148507


   CC @tweise 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #11533:
URL: https://github.com/apache/beam/pull/11533#discussion_r415735715



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -76,8 +76,8 @@
   private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
   private Coder<K> keyCoder;
 
-  // Combined watermark holds for all keys of this partition
-  private final Map<String, Instant> watermarkHolds = new HashMap<>();
+  // Watermark holds for all keys/windows of this partition
+  private final PriorityQueue<Long> watermarkHolds = new PriorityQueue<>();

Review comment:
       To sum up if I understand this correctly
   - We only need to keep a single value (the minimum)
   - We can not use scalar value that would hold the minimum, because we need a mutable wrapper
   
   in this case can we only keep the minimum value in order to have O(log N) complexity? eg.:
   ```
   void insertAndKeepMinimum(PriorityQueue<T> pq, T element) {
     pq.insert(element)
     while(pq.size() > 1) {
       pq.remove();
     }
   }
   ```
   
   `pq.remove(T ...)` has linear time complexity




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #11533:
URL: https://github.com/apache/beam/pull/11533#discussion_r415746821



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -76,8 +76,8 @@
   private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
   private Coder<K> keyCoder;
 
-  // Combined watermark holds for all keys of this partition
-  private final Map<String, Instant> watermarkHolds = new HashMap<>();
+  // Watermark holds for all keys/windows of this partition
+  private final PriorityQueue<Long> watermarkHolds = new PriorityQueue<>();

Review comment:
       Hmm, I see it now, `FlinkWatermarkHoldState` is per window :/




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-620017337


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-619888933


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-620038983


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11533:
URL: https://github.com/apache/beam/pull/11533#discussion_r415802522



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -76,8 +76,8 @@
   private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
   private Coder<K> keyCoder;
 
-  // Combined watermark holds for all keys of this partition
-  private final Map<String, Instant> watermarkHolds = new HashMap<>();
+  // Watermark holds for all keys/windows of this partition
+  private final PriorityQueue<Long> watermarkHolds = new PriorityQueue<>();

Review comment:
       Only keeping the minimum doesn't work. When the minimum hold is cleared, we will have to recompute the minimum by iterating over all the keys in the state backend which will defeat the purpose of having the cache.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-619889038


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11533:
URL: https://github.com/apache/beam/pull/11533#discussion_r415851980



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -76,8 +76,8 @@
   private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
   private Coder<K> keyCoder;
 
-  // Combined watermark holds for all keys of this partition
-  private final Map<String, Instant> watermarkHolds = new HashMap<>();
+  // Watermark holds for all keys/windows of this partition
+  private final PriorityQueue<Long> watermarkHolds = new PriorityQueue<>();

Review comment:
       Good idea. I've changed it to use a TreeMap.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #11533:
URL: https://github.com/apache/beam/pull/11533#discussion_r415812145



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -76,8 +76,8 @@
   private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
   private Coder<K> keyCoder;
 
-  // Combined watermark holds for all keys of this partition
-  private final Map<String, Instant> watermarkHolds = new HashMap<>();
+  // Watermark holds for all keys/windows of this partition
+  private final PriorityQueue<Long> watermarkHolds = new PriorityQueue<>();

Review comment:
       👍 that makes sense
   
   It would be nice if we could get rid of `pq.remove(...)` calls as these are O(n) and number of keys may be fairly large. How about using `TreeMap<Long, Integer>` instead, where value would be number of references to that particular offset?
   
   This should have O(log n) characteristic and may hopefully deduplicate some of the entries. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-619905786


   This fixes the flaky test in BEAM-9827.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-619889167


   Run Nexmark Flink


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-620039210


   Thanks for the review @dmvk!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on pull request #11533: [BEAM-9827] Ensure minimum watermark hold is computed across all keys

Posted by GitBox <gi...@apache.org>.
dmvk commented on pull request #11533:
URL: https://github.com/apache/beam/pull/11533#issuecomment-619987249


   👍 This change makes sense. I'm little bit unsure about memory requirements here, but hopefully storing an offset per key / window should be ok.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org