You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/22 22:42:30 UTC

[GitHub] [beam] dmvk commented on a change in pull request #15994: [BEAM-13263] Support OnWindowExpiration in (non-portable) Flink runner

dmvk commented on a change in pull request #15994:
URL: https://github.com/apache/beam/pull/15994#discussion_r754658243



##########
File path: runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
##########
@@ -52,6 +52,8 @@
       Instant outputTimestamp,
       TimeDomain timeDomain);
 
+  <KeyT> void onWindowExpiration(BoundedWindow window, Instant outputTimestamp, KeyT key);

Review comment:
       ```suggestion
     /** Calls the underlying {@link DoFn.OnWindowExpiration} method. */
     <KeyT> void onWindowExpiration(BoundedWindow window, Instant outputTimestamp, KeyT key);
   ```

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
     }
   }
 
+  public List<ByteBuffer> getGlobalWindowStateKeys() {

Review comment:
       This will materialize all of the keys in-memory, which could be a problem with really large state (when using RocksDB).
   
   Also since we're collecting keys into list, there will be duplicates if we have multiple states per key 🤔 (so we could expire a single window multiple times)
   
   That being said, I don't know how to solve the "materialization" problem in combination with de-duplication :( (it would be fairly easy if we'd have only a single descriptor)
   
   So for know we should at least replace list with set to address the possible duplicates. Also adding a failing test case for this would be great.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -791,11 +796,30 @@ private long computeOutputWatermark(long inputWatermarkHold) {
 
   private void maybeEmitWatermark(long watermark) {
     if (watermark > currentOutputWatermark) {
+      // If this is the end of the global window, then call onWindowExpiration callbacks. For other
+      // windows, this will
+      // be called as part of the garbage-collection timer.

Review comment:
       nit
   ```suggestion
         // windows, this will be called as part of the garbage-collection timer.
   ```

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
     }
   }
 
+  public List<ByteBuffer> getGlobalWindowStateKeys() {

Review comment:
       Hmm second thought, we could probably simply pick a single state descriptor that we know is always present (some of the system ones, eg. watermark hold). Then we should be able to expire keys one by one.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
     }
   }
 
+  public List<ByteBuffer> getGlobalWindowStateKeys() {

Review comment:
       nit
   ```suggestion
     @SuppressWarnings("unchecked")
     public List<ByteBuffer> getGlobalWindowStateKeys() {
   ```

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
##########
@@ -186,6 +201,12 @@ public TimerInternals timerInternals() {
 
     fireEligibleTimers(key, timerInternals, doFnRunner);
 
+    if (usesOnWindowExpiration) {
+      for (BoundedWindow window : windowsSeen) {

Review comment:
       My only concern here would be that all the windows for a single key need to fit in-memory. For example year worth of 1m tumbling windows would be ~500k windows. This could be IMO quite common for batch case.
   
   What about doing a simple optimization here by reusing the timestamp sorting mechanism that we already have in-place for the `@RequireTimeSortedInput` [1]?
   
   Then we could simply check for expiring windows after each element / every time the timestamp progresses.
   
   [1] https://github.com/apache/beam/blob/v2.34.0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L702




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org