You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/29 16:40:44 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #15056: [BEAM-10887] Timer clear

lukecwik commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r660780333



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */
   void setRelative();
 
+  /** Clears a timer. */

Review comment:
       ```suggestion
     /** Previously set timers will become unset. */
   ```

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -670,7 +670,8 @@ private synchronized void updateTimers(TimerUpdate update) {
             existingTimersForKey.get(
                 deletedTimer.getNamespace(),
                 deletedTimer.getTimerId() + '+' + deletedTimer.getTimerFamilyId());
-
+        System.err.println(

Review comment:
       drop debugging statement

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4851,342 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+

Review comment:
       Should we cover the case where the timer becomes eligible and still fires even though it is being cleared in the same bundle?
   
   e.g.
   set timer A for 1, set timer B for 2, advance time to 3, have timer A callback clear B, B still fires since it is part of the same bundle

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -173,48 +174,73 @@ public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkRes
       for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
-
-      final Instant inputWatermarkTime = timerInternals.currentInputWatermarkTime();
       PriorityQueue<TimerData> toBeFiredTimers =
           new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
-      gbkResult.getValue().timersIterable().forEach(toBeFiredTimers::add);
 
-      while (!timerInternals.containsUpdateForTimeBefore(inputWatermarkTime)
+      Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      for (TimerData timerData : gbkResult.getValue().timersIterable()) {
+        toBeFiredTimers.add(timerData);
+        switch (timerData.getDomain()) {
+          case EVENT_TIME:
+            maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, timerData.getTimestamp());
+            break;
+          case PROCESSING_TIME:
+            maxProcessingTime = Ordering.natural().max(maxProcessingTime, timerData.getTimestamp());
+            break;
+          case SYNCHRONIZED_PROCESSING_TIME:
+            maxSynchronizedProcessingTime =
+                Ordering.natural().max(maxSynchronizedProcessingTime, timerData.getTimestamp());
+        }
+      }
+
+      while (!timerInternals.containsUpdateForTimeBefore(
+              maxWatermarkTime, maxProcessingTime, maxSynchronizedProcessingTime)
           && !toBeFiredTimers.isEmpty()) {
+
         TimerData timer = toBeFiredTimers.poll();
         checkState(
             timer.getNamespace() instanceof WindowNamespace,
-            "Expected Timer %s to be in a %s, but got %s",
+            "Expected Timer %s to be in a        %s, but got %s",

Review comment:
       ```suggestion
               "Expected Timer %s to be in a %s, but got %s",
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */

Review comment:
       We should add documentation to Timer saying that set and/or clear calls may only become visible after this bundle completes and may not be applied immediately allowing for existing timers which have become eligible to still fire.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org