You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/22 19:05:21 UTC

[GitHub] [beam] reuvenlax commented on a change in pull request #15123: Ensure timer consistency in runners

reuvenlax commented on a change in pull request #15123:
URL: https://github.com/apache/beam/pull/15123#discussion_r675089775



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
     }
 
     // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+    // An ordered list of any timers that were set or modified by user processing earlier in this
+    // bundle.
+    // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+    // up in the queue.
+    private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+    private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return modifiedUserEventTimersOrdered;
+        case PROCESSING_TIME:
+          return modifiedUserProcessingTimersOrdered;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return modifiedUserSynchronizedProcessingTimersOrdered;
+        default:
+          throw new RuntimeException("Unexpected time domain " + timeDomain);
+      }
+    }
+
+    // A list of timer keys that were modified by user processing earlier in this bundle. This
+    // serves a tombstone, so
+    // that we know not to fire any bundle tiemrs that were moddified.
+    private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+    private void onUserTimerModified(TimerData timerData) {
+      if (!timerData.getDeleted()) {
+        getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+      }
+      modifiedUserTimerKeys.put(
+          WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+    }
+
+    private boolean timerModified(TimerData timerData) {
+      String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+      @Nullable
+      TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+      return updatedTimer != null && !updatedTimer.equals(timerData);
+    }
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
       if (cachedFiredUserTimers == null) {
+        // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+        // from the bundle
+        // and cache the list for the rest of this bundle processing.
         cachedFiredUserTimers =
-            FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
-                .iterator();
+            Iterators.peekingIterator(
+                FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+                    .filter(
+                        timer ->
+                            WindmillTimerInternals.isUserTimer(timer)
+                                && timer.getStateFamily().equals(stateFamily))
+                    .transform(
+                        timer ->
+                            WindmillTimerInternals.windmillTimerToTimerData(
+                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+                    .iterator());
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
-        return null;
+      while (cachedFiredUserTimers.hasNext()) {
+        TimerData nextInBundle = cachedFiredUserTimers.peek();
+        NavigableSet<TimerData> modifiedUserTimersOrdered =
+            getModifiedUserTimersOrdered(nextInBundle.getDomain());
+        // If there is a modified timer that is earlier than the next timer in the bundle, try and
+        // fire that first.
+        while (!modifiedUserTimersOrdered.isEmpty()
+            && modifiedUserTimersOrdered.first().compareTo(nextInBundle) <= 0) {
+          TimerData earlierTimer = modifiedUserTimersOrdered.pollFirst();
+          if (!timerModified(earlierTimer)) {
+            // We must delete the timer. This prevents it from being committed to the backing store.
+            // It also handles the
+            // case where the timer had been set to the far future and then modified in bundle;
+            // without deleting the
+            // timer, the runner will still have that future timer stored, and would fire it
+            // spuriously.
+            userTimerInternals.deleteTimer(earlierTimer);
+            return earlierTimer;
+          }
+        }
+        // There is no earlier timer to fire, so return the next timer in the bundle.
+        nextInBundle = cachedFiredUserTimers.next();
+        if (!timerModified(nextInBundle)) {
+          // User timers must be explicitly deleted when delivered, to release the implied hold.
+          userTimerInternals.deleteTimer(nextInBundle);
+          return nextInBundle;
+        }
       }
-      TimerData nextTimer = cachedFiredUserTimers.next();
-      // User timers must be explicitly deleted when delivered, to release the implied hold
-      userTimerInternals.deleteTimer(nextTimer);
-      return nextTimer;
+      return null;

Review comment:
       This isn't new - null was returned previously as well. Annotating here will require expanding this PR far beyond this file (GroupAlsoByWindows, SImpleParDoFn, etc.) so seems more appropriate for another PR

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -528,7 +535,8 @@ public void start(
               inputDataWatermark,
               processingTime,
               outputDataWatermark,
-              synchronizedProcessingTime);
+              synchronizedProcessingTime,
+              null);

Review comment:
       Good suggestion. done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -104,6 +109,9 @@ public void setTimer(TimerData timerKey) {
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         true);
+    if (onTimerModified != null) {

Review comment:
       done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -136,6 +150,9 @@ public void deleteTimer(TimerData timerKey) {
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         false);
+    if (onTimerModified != null) {

Review comment:
       done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
       Instant timestamp,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timers.put(
-        getTimerDataKey(timerId, timerFamilyId),
-        namespace,
-        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+    TimerData timer =
+        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain);
+    timers.put(getTimerDataKey(timerId, timerFamilyId), namespace, timer);
     timerStillPresent.put(getTimerDataKey(timerId, timerFamilyId), namespace, true);
+    if (onTimerModified != null) {

Review comment:
       done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
       Instant timestamp,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timers.put(
-        getTimerDataKey(timerId, timerFamilyId),
-        namespace,
-        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+    TimerData timer =

Review comment:
       Yeah - done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
     }
 
     // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+    // An ordered list of any timers that were set or modified by user processing earlier in this
+    // bundle.
+    // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+    // up in the queue.
+    private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+    private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return modifiedUserEventTimersOrdered;
+        case PROCESSING_TIME:
+          return modifiedUserProcessingTimersOrdered;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return modifiedUserSynchronizedProcessingTimersOrdered;
+        default:
+          throw new RuntimeException("Unexpected time domain " + timeDomain);
+      }
+    }
+
+    // A list of timer keys that were modified by user processing earlier in this bundle. This
+    // serves a tombstone, so
+    // that we know not to fire any bundle tiemrs that were moddified.
+    private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+    private void onUserTimerModified(TimerData timerData) {
+      if (!timerData.getDeleted()) {
+        getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+      }
+      modifiedUserTimerKeys.put(
+          WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+    }
+
+    private boolean timerModified(TimerData timerData) {
+      String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+      @Nullable
+      TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+      return updatedTimer != null && !updatedTimer.equals(timerData);
+    }
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
       if (cachedFiredUserTimers == null) {
+        // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+        // from the bundle
+        // and cache the list for the rest of this bundle processing.
         cachedFiredUserTimers =
-            FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
-                .iterator();
+            Iterators.peekingIterator(
+                FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+                    .filter(
+                        timer ->
+                            WindmillTimerInternals.isUserTimer(timer)
+                                && timer.getStateFamily().equals(stateFamily))
+                    .transform(
+                        timer ->
+                            WindmillTimerInternals.windmillTimerToTimerData(
+                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+                    .iterator());
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
-        return null;
+      while (cachedFiredUserTimers.hasNext()) {
+        TimerData nextInBundle = cachedFiredUserTimers.peek();
+        NavigableSet<TimerData> modifiedUserTimersOrdered =
+            getModifiedUserTimersOrdered(nextInBundle.getDomain());
+        // If there is a modified timer that is earlier than the next timer in the bundle, try and

Review comment:
       So that was my initial implementation, and it does work. I changed it to this structure for two main reasons
      1. This code structure mirrors the one in FnApiDoFnRunner (where timers are presented one at a time) and I preferred using the same algorithm in both places.
      2.  The other approach added a bit more code complexity (e.g. deciding when a new timer should be inserted into cachedFiredUser Timers - timers far in the future should not be). I found this code to be simpler to follow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org