You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/03 00:37:39 UTC

[GitHub] [beam] reuvenlax opened a new pull request #15123: Ensure timer consistency in runners

reuvenlax opened a new pull request #15123:
URL: https://github.com/apache/beam/pull/15123


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-891101600


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890291143


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-885752839


   Run Java_Examples_Dataflow_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] steveniemitz commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889998004


   cool, it's been running on a couple of my jobs for a few hours now.  Looking good, timers seems to be working correctly, watermark isn't stuck, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890013012


   Awesome. Do you have time to look and make sure the code looks good? Kenn
   did a review pass, but I believe he's gone on an extended vacation now.
   
   On Fri, Jul 30, 2021 at 9:15 AM Steven Niemitz ***@***.***>
   wrote:
   
   > cool, it's been running on a couple of my jobs for a few hours now.
   > Looking good, timers seems to be working correctly, watermark isn't stuck,
   > etc.
   >
   > —
   > You are receiving this because you authored the thread.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/15123#issuecomment-889998004>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AFAYJVJ2YF2MVHUUH44JBK3T2LFZVANCNFSM47XSKCSA>
   > .
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax merged pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax merged pull request #15123:
URL: https://github.com/apache/beam/pull/15123


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] steveniemitz commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889440400


   that makes sense.  The code seems reasonable too, I can give this a try on one of my jobs if you want.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890026079


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-886853816


   friendly ping!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-885749677


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-885833649


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #15123:
URL: https://github.com/apache/beam/pull/15123#discussion_r674256062



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -528,7 +535,8 @@ public void start(
               inputDataWatermark,
               processingTime,
               outputDataWatermark,
-              synchronizedProcessingTime);
+              synchronizedProcessingTime,
+              null);

Review comment:
       Prefer a noop lambda to making this `@Nullable`

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
       Instant timestamp,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timers.put(
-        getTimerDataKey(timerId, timerFamilyId),
-        namespace,
-        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+    TimerData timer =
+        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain);
+    timers.put(getTimerDataKey(timerId, timerFamilyId), namespace, timer);
     timerStillPresent.put(getTimerDataKey(timerId, timerFamilyId), namespace, true);
+    if (onTimerModified != null) {

Review comment:
       ditto

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
     }
 
     // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+    // An ordered list of any timers that were set or modified by user processing earlier in this
+    // bundle.
+    // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+    // up in the queue.
+    private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+    private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return modifiedUserEventTimersOrdered;
+        case PROCESSING_TIME:
+          return modifiedUserProcessingTimersOrdered;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return modifiedUserSynchronizedProcessingTimersOrdered;
+        default:
+          throw new RuntimeException("Unexpected time domain " + timeDomain);
+      }
+    }
+
+    // A list of timer keys that were modified by user processing earlier in this bundle. This
+    // serves a tombstone, so
+    // that we know not to fire any bundle tiemrs that were moddified.
+    private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+    private void onUserTimerModified(TimerData timerData) {
+      if (!timerData.getDeleted()) {
+        getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+      }
+      modifiedUserTimerKeys.put(
+          WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+    }
+
+    private boolean timerModified(TimerData timerData) {
+      String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+      @Nullable
+      TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+      return updatedTimer != null && !updatedTimer.equals(timerData);
+    }
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
       if (cachedFiredUserTimers == null) {
+        // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+        // from the bundle
+        // and cache the list for the rest of this bundle processing.
         cachedFiredUserTimers =
-            FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
-                .iterator();
+            Iterators.peekingIterator(
+                FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+                    .filter(
+                        timer ->
+                            WindmillTimerInternals.isUserTimer(timer)
+                                && timer.getStateFamily().equals(stateFamily))
+                    .transform(
+                        timer ->
+                            WindmillTimerInternals.windmillTimerToTimerData(
+                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+                    .iterator());
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
-        return null;
+      while (cachedFiredUserTimers.hasNext()) {
+        TimerData nextInBundle = cachedFiredUserTimers.peek();
+        NavigableSet<TimerData> modifiedUserTimersOrdered =
+            getModifiedUserTimersOrdered(nextInBundle.getDomain());
+        // If there is a modified timer that is earlier than the next timer in the bundle, try and

Review comment:
       Reading kind of quick so I probably missed something, but can `cachedFiredUserTimers` just be a navigable set and when you set a `modifiedUserTimer` you insert into it? (hence the earliest eligible timer is always the one to fire next)
   
   Is this a cost thing or semantics?

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -104,6 +109,9 @@ public void setTimer(TimerData timerKey) {
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         true);
+    if (onTimerModified != null) {

Review comment:
       Just use a noop fn.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
     }
 
     // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+    // An ordered list of any timers that were set or modified by user processing earlier in this
+    // bundle.
+    // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+    // up in the queue.
+    private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+    private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return modifiedUserEventTimersOrdered;
+        case PROCESSING_TIME:
+          return modifiedUserProcessingTimersOrdered;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return modifiedUserSynchronizedProcessingTimersOrdered;
+        default:
+          throw new RuntimeException("Unexpected time domain " + timeDomain);
+      }
+    }
+
+    // A list of timer keys that were modified by user processing earlier in this bundle. This
+    // serves a tombstone, so
+    // that we know not to fire any bundle tiemrs that were moddified.
+    private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+    private void onUserTimerModified(TimerData timerData) {
+      if (!timerData.getDeleted()) {
+        getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+      }
+      modifiedUserTimerKeys.put(
+          WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+    }
+
+    private boolean timerModified(TimerData timerData) {
+      String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+      @Nullable
+      TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+      return updatedTimer != null && !updatedTimer.equals(timerData);
+    }
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
       if (cachedFiredUserTimers == null) {
+        // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+        // from the bundle
+        // and cache the list for the rest of this bundle processing.
         cachedFiredUserTimers =
-            FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
-                .iterator();
+            Iterators.peekingIterator(
+                FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+                    .filter(
+                        timer ->
+                            WindmillTimerInternals.isUserTimer(timer)
+                                && timer.getStateFamily().equals(stateFamily))
+                    .transform(
+                        timer ->
+                            WindmillTimerInternals.windmillTimerToTimerData(
+                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+                    .iterator());
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
-        return null;
+      while (cachedFiredUserTimers.hasNext()) {
+        TimerData nextInBundle = cachedFiredUserTimers.peek();
+        NavigableSet<TimerData> modifiedUserTimersOrdered =
+            getModifiedUserTimersOrdered(nextInBundle.getDomain());
+        // If there is a modified timer that is earlier than the next timer in the bundle, try and
+        // fire that first.
+        while (!modifiedUserTimersOrdered.isEmpty()
+            && modifiedUserTimersOrdered.first().compareTo(nextInBundle) <= 0) {
+          TimerData earlierTimer = modifiedUserTimersOrdered.pollFirst();
+          if (!timerModified(earlierTimer)) {
+            // We must delete the timer. This prevents it from being committed to the backing store.
+            // It also handles the
+            // case where the timer had been set to the far future and then modified in bundle;
+            // without deleting the
+            // timer, the runner will still have that future timer stored, and would fire it
+            // spuriously.
+            userTimerInternals.deleteTimer(earlierTimer);
+            return earlierTimer;
+          }
+        }
+        // There is no earlier timer to fire, so return the next timer in the bundle.
+        nextInBundle = cachedFiredUserTimers.next();
+        if (!timerModified(nextInBundle)) {
+          // User timers must be explicitly deleted when delivered, to release the implied hold.
+          userTimerInternals.deleteTimer(nextInBundle);
+          return nextInBundle;
+        }
       }
-      TimerData nextTimer = cachedFiredUserTimers.next();
-      // User timers must be explicitly deleted when delivered, to release the implied hold
-      userTimerInternals.deleteTimer(nextTimer);
-      return nextTimer;
+      return null;

Review comment:
       Change return value to `@Nullable` (if this is really necessary)

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -136,6 +150,9 @@ public void deleteTimer(TimerData timerKey) {
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         false);
+    if (onTimerModified != null) {

Review comment:
       ditto

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
       Instant timestamp,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timers.put(
-        getTimerDataKey(timerId, timerFamilyId),
-        namespace,
-        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+    TimerData timer =

Review comment:
       Is it possible to combine the overrides here? Seems like a minor risk to have a bunch of methods that have to have corresponding changes here. I am reacting to the addition of the `onModified` callback to each of them. Seems like one centralized method would be good if possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889525484


   @steveniemitz that would be very helpful!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15123:
URL: https://github.com/apache/beam/pull/15123#discussion_r675089775



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
     }
 
     // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+    // An ordered list of any timers that were set or modified by user processing earlier in this
+    // bundle.
+    // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+    // up in the queue.
+    private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+    private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return modifiedUserEventTimersOrdered;
+        case PROCESSING_TIME:
+          return modifiedUserProcessingTimersOrdered;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return modifiedUserSynchronizedProcessingTimersOrdered;
+        default:
+          throw new RuntimeException("Unexpected time domain " + timeDomain);
+      }
+    }
+
+    // A list of timer keys that were modified by user processing earlier in this bundle. This
+    // serves a tombstone, so
+    // that we know not to fire any bundle tiemrs that were moddified.
+    private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+    private void onUserTimerModified(TimerData timerData) {
+      if (!timerData.getDeleted()) {
+        getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+      }
+      modifiedUserTimerKeys.put(
+          WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+    }
+
+    private boolean timerModified(TimerData timerData) {
+      String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+      @Nullable
+      TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+      return updatedTimer != null && !updatedTimer.equals(timerData);
+    }
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
       if (cachedFiredUserTimers == null) {
+        // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+        // from the bundle
+        // and cache the list for the rest of this bundle processing.
         cachedFiredUserTimers =
-            FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
-                .iterator();
+            Iterators.peekingIterator(
+                FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+                    .filter(
+                        timer ->
+                            WindmillTimerInternals.isUserTimer(timer)
+                                && timer.getStateFamily().equals(stateFamily))
+                    .transform(
+                        timer ->
+                            WindmillTimerInternals.windmillTimerToTimerData(
+                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+                    .iterator());
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
-        return null;
+      while (cachedFiredUserTimers.hasNext()) {
+        TimerData nextInBundle = cachedFiredUserTimers.peek();
+        NavigableSet<TimerData> modifiedUserTimersOrdered =
+            getModifiedUserTimersOrdered(nextInBundle.getDomain());
+        // If there is a modified timer that is earlier than the next timer in the bundle, try and
+        // fire that first.
+        while (!modifiedUserTimersOrdered.isEmpty()
+            && modifiedUserTimersOrdered.first().compareTo(nextInBundle) <= 0) {
+          TimerData earlierTimer = modifiedUserTimersOrdered.pollFirst();
+          if (!timerModified(earlierTimer)) {
+            // We must delete the timer. This prevents it from being committed to the backing store.
+            // It also handles the
+            // case where the timer had been set to the far future and then modified in bundle;
+            // without deleting the
+            // timer, the runner will still have that future timer stored, and would fire it
+            // spuriously.
+            userTimerInternals.deleteTimer(earlierTimer);
+            return earlierTimer;
+          }
+        }
+        // There is no earlier timer to fire, so return the next timer in the bundle.
+        nextInBundle = cachedFiredUserTimers.next();
+        if (!timerModified(nextInBundle)) {
+          // User timers must be explicitly deleted when delivered, to release the implied hold.
+          userTimerInternals.deleteTimer(nextInBundle);
+          return nextInBundle;
+        }
       }
-      TimerData nextTimer = cachedFiredUserTimers.next();
-      // User timers must be explicitly deleted when delivered, to release the implied hold
-      userTimerInternals.deleteTimer(nextTimer);
-      return nextTimer;
+      return null;

Review comment:
       This isn't new - null was returned previously as well. Annotating here will require expanding this PR far beyond this file (GroupAlsoByWindows, SImpleParDoFn, etc.) so seems more appropriate for another PR

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -528,7 +535,8 @@ public void start(
               inputDataWatermark,
               processingTime,
               outputDataWatermark,
-              synchronizedProcessingTime);
+              synchronizedProcessingTime,
+              null);

Review comment:
       Good suggestion. done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -104,6 +109,9 @@ public void setTimer(TimerData timerKey) {
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         true);
+    if (onTimerModified != null) {

Review comment:
       done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -136,6 +150,9 @@ public void deleteTimer(TimerData timerKey) {
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         false);
+    if (onTimerModified != null) {

Review comment:
       done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
       Instant timestamp,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timers.put(
-        getTimerDataKey(timerId, timerFamilyId),
-        namespace,
-        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+    TimerData timer =
+        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain);
+    timers.put(getTimerDataKey(timerId, timerFamilyId), namespace, timer);
     timerStillPresent.put(getTimerDataKey(timerId, timerFamilyId), namespace, true);
+    if (onTimerModified != null) {

Review comment:
       done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
       Instant timestamp,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timers.put(
-        getTimerDataKey(timerId, timerFamilyId),
-        namespace,
-        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+    TimerData timer =

Review comment:
       Yeah - done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
     }
 
     // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+    // An ordered list of any timers that were set or modified by user processing earlier in this
+    // bundle.
+    // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+    // up in the queue.
+    private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+    private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return modifiedUserEventTimersOrdered;
+        case PROCESSING_TIME:
+          return modifiedUserProcessingTimersOrdered;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return modifiedUserSynchronizedProcessingTimersOrdered;
+        default:
+          throw new RuntimeException("Unexpected time domain " + timeDomain);
+      }
+    }
+
+    // A list of timer keys that were modified by user processing earlier in this bundle. This
+    // serves a tombstone, so
+    // that we know not to fire any bundle tiemrs that were moddified.
+    private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+    private void onUserTimerModified(TimerData timerData) {
+      if (!timerData.getDeleted()) {
+        getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+      }
+      modifiedUserTimerKeys.put(
+          WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+    }
+
+    private boolean timerModified(TimerData timerData) {
+      String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+      @Nullable
+      TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+      return updatedTimer != null && !updatedTimer.equals(timerData);
+    }
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
       if (cachedFiredUserTimers == null) {
+        // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+        // from the bundle
+        // and cache the list for the rest of this bundle processing.
         cachedFiredUserTimers =
-            FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
-                .iterator();
+            Iterators.peekingIterator(
+                FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+                    .filter(
+                        timer ->
+                            WindmillTimerInternals.isUserTimer(timer)
+                                && timer.getStateFamily().equals(stateFamily))
+                    .transform(
+                        timer ->
+                            WindmillTimerInternals.windmillTimerToTimerData(
+                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+                    .iterator());
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
-        return null;
+      while (cachedFiredUserTimers.hasNext()) {
+        TimerData nextInBundle = cachedFiredUserTimers.peek();
+        NavigableSet<TimerData> modifiedUserTimersOrdered =
+            getModifiedUserTimersOrdered(nextInBundle.getDomain());
+        // If there is a modified timer that is earlier than the next timer in the bundle, try and

Review comment:
       So that was my initial implementation, and it does work. I changed it to this structure for two main reasons
      1. This code structure mirrors the one in FnApiDoFnRunner (where timers are presented one at a time) and I preferred using the same algorithm in both places.
      2.  The other approach added a bit more code complexity (e.g. deciding when a new timer should be inserted into cachedFiredUser Timers - timers far in the future should not be). I found this code to be simpler to follow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890550702


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-884337504


   R: @kennknowles 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890415908


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889412539


   R: @steveniemitz


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-881575422


   R: @lcwik


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889437010


   Correct - this is a long-standing bug that has bitten a number of users.
   State variables are updated immediately, but timer state only updated
   between bundles. This meant that if you tried to keep the timer timestamps
   in state, you would sometimes observe inconsistent results depending on how
   the runner bundled things. The recent addition of Timer.clear() makes this
   even worse, as you could clear a timer and then have it still fire if it
   was already in the bundle.
   
   On Thu, Jul 29, 2021 at 1:25 PM Steven Niemitz ***@***.***>
   wrote:
   
   > it seems like the intent here is to make it so timers set/cleared in a
   > bundle can modify timers that were queued to fire in the same bundle (so
   > they no longer fire)? Am I understanding the problem correctly?
   >
   > —
   > You are receiving this because you authored the thread.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/15123#issuecomment-889434662>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AFAYJVPM23MP3OHPJBITKRTT2G2NLANCNFSM47XSKCSA>
   > .
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-881557003


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890559320


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890283083


   Run Java_Examples_Dataflow_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] steveniemitz commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890023188


   yep, code looked good to me!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890552844


   Run Java PreCommi


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890565355


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] steveniemitz commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889434662


   it seems like the intent here is to make it so timers set/cleared in a bundle can modify timers that were queued to fire in the same bundle (so they no longer fire)?  Am I understanding the problem correctly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890283391


   Run Portable_Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890426959


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-885749800


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-886237591


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890283069


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890403323


   Run Java Flink PortableValidatesRunner Streaming


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org