You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/21 19:14:45 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #15123: Ensure timer consistency in runners

kennknowles commented on a change in pull request #15123:
URL: https://github.com/apache/beam/pull/15123#discussion_r674256062



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -528,7 +535,8 @@ public void start(
               inputDataWatermark,
               processingTime,
               outputDataWatermark,
-              synchronizedProcessingTime);
+              synchronizedProcessingTime,
+              null);

Review comment:
       Prefer a noop lambda to making this `@Nullable`

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
       Instant timestamp,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timers.put(
-        getTimerDataKey(timerId, timerFamilyId),
-        namespace,
-        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+    TimerData timer =
+        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain);
+    timers.put(getTimerDataKey(timerId, timerFamilyId), namespace, timer);
     timerStillPresent.put(getTimerDataKey(timerId, timerFamilyId), namespace, true);
+    if (onTimerModified != null) {

Review comment:
       ditto

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
     }
 
     // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+    // An ordered list of any timers that were set or modified by user processing earlier in this
+    // bundle.
+    // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+    // up in the queue.
+    private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+    private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return modifiedUserEventTimersOrdered;
+        case PROCESSING_TIME:
+          return modifiedUserProcessingTimersOrdered;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return modifiedUserSynchronizedProcessingTimersOrdered;
+        default:
+          throw new RuntimeException("Unexpected time domain " + timeDomain);
+      }
+    }
+
+    // A list of timer keys that were modified by user processing earlier in this bundle. This
+    // serves a tombstone, so
+    // that we know not to fire any bundle tiemrs that were moddified.
+    private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+    private void onUserTimerModified(TimerData timerData) {
+      if (!timerData.getDeleted()) {
+        getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+      }
+      modifiedUserTimerKeys.put(
+          WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+    }
+
+    private boolean timerModified(TimerData timerData) {
+      String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+      @Nullable
+      TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+      return updatedTimer != null && !updatedTimer.equals(timerData);
+    }
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
       if (cachedFiredUserTimers == null) {
+        // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+        // from the bundle
+        // and cache the list for the rest of this bundle processing.
         cachedFiredUserTimers =
-            FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
-                .iterator();
+            Iterators.peekingIterator(
+                FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+                    .filter(
+                        timer ->
+                            WindmillTimerInternals.isUserTimer(timer)
+                                && timer.getStateFamily().equals(stateFamily))
+                    .transform(
+                        timer ->
+                            WindmillTimerInternals.windmillTimerToTimerData(
+                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+                    .iterator());
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
-        return null;
+      while (cachedFiredUserTimers.hasNext()) {
+        TimerData nextInBundle = cachedFiredUserTimers.peek();
+        NavigableSet<TimerData> modifiedUserTimersOrdered =
+            getModifiedUserTimersOrdered(nextInBundle.getDomain());
+        // If there is a modified timer that is earlier than the next timer in the bundle, try and

Review comment:
       Reading kind of quick so I probably missed something, but can `cachedFiredUserTimers` just be a navigable set and when you set a `modifiedUserTimer` you insert into it? (hence the earliest eligible timer is always the one to fire next)
   
   Is this a cost thing or semantics?

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -104,6 +109,9 @@ public void setTimer(TimerData timerKey) {
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         true);
+    if (onTimerModified != null) {

Review comment:
       Just use a noop fn.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
     }
 
     // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+    // An ordered list of any timers that were set or modified by user processing earlier in this
+    // bundle.
+    // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+    // up in the queue.
+    private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+    private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+    private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return modifiedUserEventTimersOrdered;
+        case PROCESSING_TIME:
+          return modifiedUserProcessingTimersOrdered;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return modifiedUserSynchronizedProcessingTimersOrdered;
+        default:
+          throw new RuntimeException("Unexpected time domain " + timeDomain);
+      }
+    }
+
+    // A list of timer keys that were modified by user processing earlier in this bundle. This
+    // serves a tombstone, so
+    // that we know not to fire any bundle tiemrs that were moddified.
+    private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+    private void onUserTimerModified(TimerData timerData) {
+      if (!timerData.getDeleted()) {
+        getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+      }
+      modifiedUserTimerKeys.put(
+          WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+    }
+
+    private boolean timerModified(TimerData timerData) {
+      String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+      @Nullable
+      TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+      return updatedTimer != null && !updatedTimer.equals(timerData);
+    }
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
       if (cachedFiredUserTimers == null) {
+        // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+        // from the bundle
+        // and cache the list for the rest of this bundle processing.
         cachedFiredUserTimers =
-            FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
-                .iterator();
+            Iterators.peekingIterator(
+                FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+                    .filter(
+                        timer ->
+                            WindmillTimerInternals.isUserTimer(timer)
+                                && timer.getStateFamily().equals(stateFamily))
+                    .transform(
+                        timer ->
+                            WindmillTimerInternals.windmillTimerToTimerData(
+                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+                    .iterator());
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
-        return null;
+      while (cachedFiredUserTimers.hasNext()) {
+        TimerData nextInBundle = cachedFiredUserTimers.peek();
+        NavigableSet<TimerData> modifiedUserTimersOrdered =
+            getModifiedUserTimersOrdered(nextInBundle.getDomain());
+        // If there is a modified timer that is earlier than the next timer in the bundle, try and
+        // fire that first.
+        while (!modifiedUserTimersOrdered.isEmpty()
+            && modifiedUserTimersOrdered.first().compareTo(nextInBundle) <= 0) {
+          TimerData earlierTimer = modifiedUserTimersOrdered.pollFirst();
+          if (!timerModified(earlierTimer)) {
+            // We must delete the timer. This prevents it from being committed to the backing store.
+            // It also handles the
+            // case where the timer had been set to the far future and then modified in bundle;
+            // without deleting the
+            // timer, the runner will still have that future timer stored, and would fire it
+            // spuriously.
+            userTimerInternals.deleteTimer(earlierTimer);
+            return earlierTimer;
+          }
+        }
+        // There is no earlier timer to fire, so return the next timer in the bundle.
+        nextInBundle = cachedFiredUserTimers.next();
+        if (!timerModified(nextInBundle)) {
+          // User timers must be explicitly deleted when delivered, to release the implied hold.
+          userTimerInternals.deleteTimer(nextInBundle);
+          return nextInBundle;
+        }
       }
-      TimerData nextTimer = cachedFiredUserTimers.next();
-      // User timers must be explicitly deleted when delivered, to release the implied hold
-      userTimerInternals.deleteTimer(nextTimer);
-      return nextTimer;
+      return null;

Review comment:
       Change return value to `@Nullable` (if this is really necessary)

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -136,6 +150,9 @@ public void deleteTimer(TimerData timerKey) {
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         false);
+    if (onTimerModified != null) {

Review comment:
       ditto

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
       Instant timestamp,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timers.put(
-        getTimerDataKey(timerId, timerFamilyId),
-        namespace,
-        TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+    TimerData timer =

Review comment:
       Is it possible to combine the overrides here? Seems like a minor risk to have a bunch of methods that have to have corresponding changes here. I am reacting to the addition of the `onModified` callback to each of them. Seems like one centralized method would be good if possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org