You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/22 00:11:05 UTC

[GitHub] [beam] reuvenlax opened a new pull request #15056: [BEAM-10887] Timer clear and watermark

reuvenlax opened a new pull request #15056:
URL: https://github.com/apache/beam/pull/15056


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r660926508



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */

Review comment:
       I see, if that is the case then great we should cover those cases with TestStream as I described below.
   
   I didn't see where we had come to consensus on fixing this so I assumed we went with it not being visible based upon what I saw the code doing here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r663296194



##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -173,48 +174,73 @@ public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkRes
       for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
-
-      final Instant inputWatermarkTime = timerInternals.currentInputWatermarkTime();
       PriorityQueue<TimerData> toBeFiredTimers =
           new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
-      gbkResult.getValue().timersIterable().forEach(toBeFiredTimers::add);
 
-      while (!timerInternals.containsUpdateForTimeBefore(inputWatermarkTime)
+      Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      for (TimerData timerData : gbkResult.getValue().timersIterable()) {
+        toBeFiredTimers.add(timerData);
+        switch (timerData.getDomain()) {
+          case EVENT_TIME:
+            maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, timerData.getTimestamp());
+            break;
+          case PROCESSING_TIME:
+            maxProcessingTime = Ordering.natural().max(maxProcessingTime, timerData.getTimestamp());
+            break;
+          case SYNCHRONIZED_PROCESSING_TIME:
+            maxSynchronizedProcessingTime =
+                Ordering.natural().max(maxSynchronizedProcessingTime, timerData.getTimestamp());
+        }
+      }
+
+      while (!timerInternals.containsUpdateForTimeBefore(
+              maxWatermarkTime, maxProcessingTime, maxSynchronizedProcessingTime)
           && !toBeFiredTimers.isEmpty()) {
+
         TimerData timer = toBeFiredTimers.poll();
         checkState(
             timer.getNamespace() instanceof WindowNamespace,
-            "Expected Timer %s to be in a %s, but got %s",
+            "Expected Timer %s to be in a        %s, but got %s",

Review comment:
       Done

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */

Review comment:
       Will add this test to the followon PR, as it will fail until that is in

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */
   void setRelative();
 
+  /** Clears a timer. */

Review comment:
       Done

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -670,7 +670,8 @@ private synchronized void updateTimers(TimerUpdate update) {
             existingTimersForKey.get(
                 deletedTimer.getNamespace(),
                 deletedTimer.getTimerId() + '+' + deletedTimer.getTimerFamilyId());
-
+        System.err.println(

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15056: [BEAM-10887] Timer clear and watermark

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-865429024


   This is an attempt to revive @boyuanzz 's previous PR, adding Timer.clear(). @lukecwik had some semantic questions - I believe at this point the behavior of timers overridden in the same bundle has been well discussed, and the desire is for the override to win. Users should not need to know about how elements/timers are bundled in order to use them. (NB: the correct behavior is currently broken for the Dataflow runner - there was an attempt to fix it last year, but that PR was rolled back and never resubmitted).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r665558339



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */

Review comment:
       sg

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4851,342 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+

Review comment:
       sg




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-875850969


   this PR seems to have included some unrelated changes. will rollback and resubmit
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-875767758


   friendly ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax merged pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax merged pull request #15056:
URL: https://github.com/apache/beam/pull/15056


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-873808460


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r660780333



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */
   void setRelative();
 
+  /** Clears a timer. */

Review comment:
       ```suggestion
     /** Previously set timers will become unset. */
   ```

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -670,7 +670,8 @@ private synchronized void updateTimers(TimerUpdate update) {
             existingTimersForKey.get(
                 deletedTimer.getNamespace(),
                 deletedTimer.getTimerId() + '+' + deletedTimer.getTimerFamilyId());
-
+        System.err.println(

Review comment:
       drop debugging statement

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4851,342 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+

Review comment:
       Should we cover the case where the timer becomes eligible and still fires even though it is being cleared in the same bundle?
   
   e.g.
   set timer A for 1, set timer B for 2, advance time to 3, have timer A callback clear B, B still fires since it is part of the same bundle

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -173,48 +174,73 @@ public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkRes
       for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
-
-      final Instant inputWatermarkTime = timerInternals.currentInputWatermarkTime();
       PriorityQueue<TimerData> toBeFiredTimers =
           new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
-      gbkResult.getValue().timersIterable().forEach(toBeFiredTimers::add);
 
-      while (!timerInternals.containsUpdateForTimeBefore(inputWatermarkTime)
+      Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      for (TimerData timerData : gbkResult.getValue().timersIterable()) {
+        toBeFiredTimers.add(timerData);
+        switch (timerData.getDomain()) {
+          case EVENT_TIME:
+            maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, timerData.getTimestamp());
+            break;
+          case PROCESSING_TIME:
+            maxProcessingTime = Ordering.natural().max(maxProcessingTime, timerData.getTimestamp());
+            break;
+          case SYNCHRONIZED_PROCESSING_TIME:
+            maxSynchronizedProcessingTime =
+                Ordering.natural().max(maxSynchronizedProcessingTime, timerData.getTimestamp());
+        }
+      }
+
+      while (!timerInternals.containsUpdateForTimeBefore(
+              maxWatermarkTime, maxProcessingTime, maxSynchronizedProcessingTime)
           && !toBeFiredTimers.isEmpty()) {
+
         TimerData timer = toBeFiredTimers.poll();
         checkState(
             timer.getNamespace() instanceof WindowNamespace,
-            "Expected Timer %s to be in a %s, but got %s",
+            "Expected Timer %s to be in a        %s, but got %s",

Review comment:
       ```suggestion
               "Expected Timer %s to be in a %s, but got %s",
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */

Review comment:
       We should add documentation to Timer saying that set and/or clear calls may only become visible after this bundle completes and may not be applied immediately allowing for existing timers which have become eligible to still fire.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-875772153


   > friendly ping
   
   Did you forget to push up any changes (e.g. dropping the debug statement)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-870242016


   friendly ping!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15056: [BEAM-10887] Timer clear and watermark

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-865429024


   This is an attempt to revive @boyuanzz 's previous PR, adding Timer.clear(). @lukecwik had some semantic questions - I believe at this point the behavior of timers overridden in the same bundle has been well discussed, and the desire is for the override to win. Users should not need to know about how elements/timers are bundled in order to use them. (NB: the correct behavior is currently broken for the Dataflow runner - there was an attempt to fix it last year, but that PR was rolled back and never resubmitted).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r660926508



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */

Review comment:
       I see, if that is the case then great, we should add the tests as I suggested below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-875808163


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-873451062


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r663268046



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4851,342 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+

Review comment:
       The test Luke describes still seems useful. I would guess for most runners it will exercise the desired behavior. Since TestStream (by definition) waits for quiescence before processing the next phase runner's will typically bundle everything together because it will be queued up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r660888061



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */

Review comment:
       But that is incorrect behavior (as has been agreed on the dev list many times). set/clear calls should take effect immediately. The fact that this is sometimes not true has caused user bugs before because calls to modify state take effect immediately, leading to users seeing inconsistencies between state & timers.
   
   I have a followon PR to fix this bug, so set/clear calls will be visible immediately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-875776767


   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r663229663



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4851,342 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+

Review comment:
       I'm not sure how to guarantee that two timers are in the same bundle. Runners can bundle how they please, so simply advancing time to 3 provides no contract that timers A and B will in the same bundle.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15056:
URL: https://github.com/apache/beam/pull/15056#issuecomment-870242016


   friendly ping!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15056: [BEAM-10887] Timer clear

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r663268811



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4851,342 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+

Review comment:
       Ok, but I'll add this test to my followon PR that actually fixes timers when there are in-bundle modifications. The correct test will fail currently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org