You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/03 00:37:39 UTC
[GitHub] [beam] reuvenlax opened a new pull request #15123: Ensure timer consistency in runners
reuvenlax opened a new pull request #15123:
URL: https://github.com/apache/beam/pull/15123
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-891101600
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890291143
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-885752839
Run Java_Examples_Dataflow_Java11 PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] steveniemitz commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
steveniemitz commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889998004
cool, it's been running on a couple of my jobs for a few hours now. Looking good, timers seems to be working correctly, watermark isn't stuck, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890013012
Awesome. Do you have time to look and make sure the code looks good? Kenn
did a review pass, but I believe he's gone on an extended vacation now.
On Fri, Jul 30, 2021 at 9:15 AM Steven Niemitz ***@***.***>
wrote:
> cool, it's been running on a couple of my jobs for a few hours now.
> Looking good, timers seems to be working correctly, watermark isn't stuck,
> etc.
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/beam/pull/15123#issuecomment-889998004>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AFAYJVJ2YF2MVHUUH44JBK3T2LFZVANCNFSM47XSKCSA>
> .
>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax merged pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax merged pull request #15123:
URL: https://github.com/apache/beam/pull/15123
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] steveniemitz commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
steveniemitz commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889440400
that makes sense. The code seems reasonable too, I can give this a try on one of my jobs if you want.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890026079
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-886853816
friendly ping!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-885749677
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-885833649
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] kennknowles commented on a change in pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #15123:
URL: https://github.com/apache/beam/pull/15123#discussion_r674256062
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -528,7 +535,8 @@ public void start(
inputDataWatermark,
processingTime,
outputDataWatermark,
- synchronizedProcessingTime);
+ synchronizedProcessingTime,
+ null);
Review comment:
Prefer a noop lambda to making this `@Nullable`
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
- timers.put(
- getTimerDataKey(timerId, timerFamilyId),
- namespace,
- TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+ TimerData timer =
+ TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain);
+ timers.put(getTimerDataKey(timerId, timerFamilyId), namespace, timer);
timerStillPresent.put(getTimerDataKey(timerId, timerFamilyId), namespace, true);
+ if (onTimerModified != null) {
Review comment:
ditto
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
}
// Lazily initialized
- private Iterator<TimerData> cachedFiredUserTimers = null;
+ private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+ // An ordered list of any timers that were set or modified by user processing earlier in this
+ // bundle.
+ // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+ // up in the queue.
+ private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+ private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+ switch (timeDomain) {
+ case EVENT_TIME:
+ return modifiedUserEventTimersOrdered;
+ case PROCESSING_TIME:
+ return modifiedUserProcessingTimersOrdered;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return modifiedUserSynchronizedProcessingTimersOrdered;
+ default:
+ throw new RuntimeException("Unexpected time domain " + timeDomain);
+ }
+ }
+
+ // A list of timer keys that were modified by user processing earlier in this bundle. This
+ // serves a tombstone, so
+ // that we know not to fire any bundle tiemrs that were moddified.
+ private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+ private void onUserTimerModified(TimerData timerData) {
+ if (!timerData.getDeleted()) {
+ getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+ }
+ modifiedUserTimerKeys.put(
+ WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+ }
+
+ private boolean timerModified(TimerData timerData) {
+ String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+ @Nullable
+ TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+ return updatedTimer != null && !updatedTimer.equals(timerData);
+ }
public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
if (cachedFiredUserTimers == null) {
+ // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+ // from the bundle
+ // and cache the list for the rest of this bundle processing.
cachedFiredUserTimers =
- FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isUserTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
- .transform(
- timer ->
- WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
- .iterator();
+ Iterators.peekingIterator(
+ FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+ .filter(
+ timer ->
+ WindmillTimerInternals.isUserTimer(timer)
+ && timer.getStateFamily().equals(stateFamily))
+ .transform(
+ timer ->
+ WindmillTimerInternals.windmillTimerToTimerData(
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+ .iterator());
}
- if (!cachedFiredUserTimers.hasNext()) {
- return null;
+ while (cachedFiredUserTimers.hasNext()) {
+ TimerData nextInBundle = cachedFiredUserTimers.peek();
+ NavigableSet<TimerData> modifiedUserTimersOrdered =
+ getModifiedUserTimersOrdered(nextInBundle.getDomain());
+ // If there is a modified timer that is earlier than the next timer in the bundle, try and
Review comment:
Reading kind of quick so I probably missed something, but can `cachedFiredUserTimers` just be a navigable set and when you set a `modifiedUserTimer` you insert into it? (hence the earliest eligible timer is always the one to fire next)
Is this a cost thing or semantics?
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -104,6 +109,9 @@ public void setTimer(TimerData timerKey) {
getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
timerKey.getNamespace(),
true);
+ if (onTimerModified != null) {
Review comment:
Just use a noop fn.
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
}
// Lazily initialized
- private Iterator<TimerData> cachedFiredUserTimers = null;
+ private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+ // An ordered list of any timers that were set or modified by user processing earlier in this
+ // bundle.
+ // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+ // up in the queue.
+ private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+ private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+ switch (timeDomain) {
+ case EVENT_TIME:
+ return modifiedUserEventTimersOrdered;
+ case PROCESSING_TIME:
+ return modifiedUserProcessingTimersOrdered;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return modifiedUserSynchronizedProcessingTimersOrdered;
+ default:
+ throw new RuntimeException("Unexpected time domain " + timeDomain);
+ }
+ }
+
+ // A list of timer keys that were modified by user processing earlier in this bundle. This
+ // serves a tombstone, so
+ // that we know not to fire any bundle tiemrs that were moddified.
+ private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+ private void onUserTimerModified(TimerData timerData) {
+ if (!timerData.getDeleted()) {
+ getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+ }
+ modifiedUserTimerKeys.put(
+ WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+ }
+
+ private boolean timerModified(TimerData timerData) {
+ String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+ @Nullable
+ TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+ return updatedTimer != null && !updatedTimer.equals(timerData);
+ }
public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
if (cachedFiredUserTimers == null) {
+ // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+ // from the bundle
+ // and cache the list for the rest of this bundle processing.
cachedFiredUserTimers =
- FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isUserTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
- .transform(
- timer ->
- WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
- .iterator();
+ Iterators.peekingIterator(
+ FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+ .filter(
+ timer ->
+ WindmillTimerInternals.isUserTimer(timer)
+ && timer.getStateFamily().equals(stateFamily))
+ .transform(
+ timer ->
+ WindmillTimerInternals.windmillTimerToTimerData(
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+ .iterator());
}
- if (!cachedFiredUserTimers.hasNext()) {
- return null;
+ while (cachedFiredUserTimers.hasNext()) {
+ TimerData nextInBundle = cachedFiredUserTimers.peek();
+ NavigableSet<TimerData> modifiedUserTimersOrdered =
+ getModifiedUserTimersOrdered(nextInBundle.getDomain());
+ // If there is a modified timer that is earlier than the next timer in the bundle, try and
+ // fire that first.
+ while (!modifiedUserTimersOrdered.isEmpty()
+ && modifiedUserTimersOrdered.first().compareTo(nextInBundle) <= 0) {
+ TimerData earlierTimer = modifiedUserTimersOrdered.pollFirst();
+ if (!timerModified(earlierTimer)) {
+ // We must delete the timer. This prevents it from being committed to the backing store.
+ // It also handles the
+ // case where the timer had been set to the far future and then modified in bundle;
+ // without deleting the
+ // timer, the runner will still have that future timer stored, and would fire it
+ // spuriously.
+ userTimerInternals.deleteTimer(earlierTimer);
+ return earlierTimer;
+ }
+ }
+ // There is no earlier timer to fire, so return the next timer in the bundle.
+ nextInBundle = cachedFiredUserTimers.next();
+ if (!timerModified(nextInBundle)) {
+ // User timers must be explicitly deleted when delivered, to release the implied hold.
+ userTimerInternals.deleteTimer(nextInBundle);
+ return nextInBundle;
+ }
}
- TimerData nextTimer = cachedFiredUserTimers.next();
- // User timers must be explicitly deleted when delivered, to release the implied hold
- userTimerInternals.deleteTimer(nextTimer);
- return nextTimer;
+ return null;
Review comment:
Change return value to `@Nullable` (if this is really necessary)
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -136,6 +150,9 @@ public void deleteTimer(TimerData timerKey) {
getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
timerKey.getNamespace(),
false);
+ if (onTimerModified != null) {
Review comment:
ditto
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
- timers.put(
- getTimerDataKey(timerId, timerFamilyId),
- namespace,
- TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+ TimerData timer =
Review comment:
Is it possible to combine the overrides here? Seems like a minor risk to have a bunch of methods that have to have corresponding changes here. I am reacting to the addition of the `onModified` callback to each of them. Seems like one centralized method would be good if possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889525484
@steveniemitz that would be very helpful!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on a change in pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15123:
URL: https://github.com/apache/beam/pull/15123#discussion_r675089775
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
}
// Lazily initialized
- private Iterator<TimerData> cachedFiredUserTimers = null;
+ private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+ // An ordered list of any timers that were set or modified by user processing earlier in this
+ // bundle.
+ // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+ // up in the queue.
+ private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+ private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+ switch (timeDomain) {
+ case EVENT_TIME:
+ return modifiedUserEventTimersOrdered;
+ case PROCESSING_TIME:
+ return modifiedUserProcessingTimersOrdered;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return modifiedUserSynchronizedProcessingTimersOrdered;
+ default:
+ throw new RuntimeException("Unexpected time domain " + timeDomain);
+ }
+ }
+
+ // A list of timer keys that were modified by user processing earlier in this bundle. This
+ // serves a tombstone, so
+ // that we know not to fire any bundle tiemrs that were moddified.
+ private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+ private void onUserTimerModified(TimerData timerData) {
+ if (!timerData.getDeleted()) {
+ getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+ }
+ modifiedUserTimerKeys.put(
+ WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+ }
+
+ private boolean timerModified(TimerData timerData) {
+ String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+ @Nullable
+ TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+ return updatedTimer != null && !updatedTimer.equals(timerData);
+ }
public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
if (cachedFiredUserTimers == null) {
+ // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+ // from the bundle
+ // and cache the list for the rest of this bundle processing.
cachedFiredUserTimers =
- FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isUserTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
- .transform(
- timer ->
- WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
- .iterator();
+ Iterators.peekingIterator(
+ FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+ .filter(
+ timer ->
+ WindmillTimerInternals.isUserTimer(timer)
+ && timer.getStateFamily().equals(stateFamily))
+ .transform(
+ timer ->
+ WindmillTimerInternals.windmillTimerToTimerData(
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+ .iterator());
}
- if (!cachedFiredUserTimers.hasNext()) {
- return null;
+ while (cachedFiredUserTimers.hasNext()) {
+ TimerData nextInBundle = cachedFiredUserTimers.peek();
+ NavigableSet<TimerData> modifiedUserTimersOrdered =
+ getModifiedUserTimersOrdered(nextInBundle.getDomain());
+ // If there is a modified timer that is earlier than the next timer in the bundle, try and
+ // fire that first.
+ while (!modifiedUserTimersOrdered.isEmpty()
+ && modifiedUserTimersOrdered.first().compareTo(nextInBundle) <= 0) {
+ TimerData earlierTimer = modifiedUserTimersOrdered.pollFirst();
+ if (!timerModified(earlierTimer)) {
+ // We must delete the timer. This prevents it from being committed to the backing store.
+ // It also handles the
+ // case where the timer had been set to the far future and then modified in bundle;
+ // without deleting the
+ // timer, the runner will still have that future timer stored, and would fire it
+ // spuriously.
+ userTimerInternals.deleteTimer(earlierTimer);
+ return earlierTimer;
+ }
+ }
+ // There is no earlier timer to fire, so return the next timer in the bundle.
+ nextInBundle = cachedFiredUserTimers.next();
+ if (!timerModified(nextInBundle)) {
+ // User timers must be explicitly deleted when delivered, to release the implied hold.
+ userTimerInternals.deleteTimer(nextInBundle);
+ return nextInBundle;
+ }
}
- TimerData nextTimer = cachedFiredUserTimers.next();
- // User timers must be explicitly deleted when delivered, to release the implied hold
- userTimerInternals.deleteTimer(nextTimer);
- return nextTimer;
+ return null;
Review comment:
This isn't new - null was returned previously as well. Annotating here will require expanding this PR far beyond this file (GroupAlsoByWindows, SImpleParDoFn, etc.) so seems more appropriate for another PR
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -528,7 +535,8 @@ public void start(
inputDataWatermark,
processingTime,
outputDataWatermark,
- synchronizedProcessingTime);
+ synchronizedProcessingTime,
+ null);
Review comment:
Good suggestion. done
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -104,6 +109,9 @@ public void setTimer(TimerData timerKey) {
getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
timerKey.getNamespace(),
true);
+ if (onTimerModified != null) {
Review comment:
done
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -136,6 +150,9 @@ public void deleteTimer(TimerData timerKey) {
getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
timerKey.getNamespace(),
false);
+ if (onTimerModified != null) {
Review comment:
done
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
- timers.put(
- getTimerDataKey(timerId, timerFamilyId),
- namespace,
- TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+ TimerData timer =
+ TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain);
+ timers.put(getTimerDataKey(timerId, timerFamilyId), namespace, timer);
timerStillPresent.put(getTimerDataKey(timerId, timerFamilyId), namespace, true);
+ if (onTimerModified != null) {
Review comment:
done
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -114,14 +122,20 @@ public void setTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
- timers.put(
- getTimerDataKey(timerId, timerFamilyId),
- namespace,
- TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain));
+ TimerData timer =
Review comment:
Yeah - done
##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -581,30 +594,96 @@ public void flushState() {
}
// Lazily initialized
- private Iterator<TimerData> cachedFiredUserTimers = null;
+ private PeekingIterator<TimerData> cachedFiredUserTimers = null;
+ // An ordered list of any timers that were set or modified by user processing earlier in this
+ // bundle.
+ // We use a NavigableSet instead of a priority queue to prevent duplicate elements from ending
+ // up in the queue.
+ private NavigableSet<TimerData> modifiedUserEventTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserProcessingTimersOrdered = null;
+ private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
+
+ private NavigableSet<TimerData> getModifiedUserTimersOrdered(TimeDomain timeDomain) {
+ switch (timeDomain) {
+ case EVENT_TIME:
+ return modifiedUserEventTimersOrdered;
+ case PROCESSING_TIME:
+ return modifiedUserProcessingTimersOrdered;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return modifiedUserSynchronizedProcessingTimersOrdered;
+ default:
+ throw new RuntimeException("Unexpected time domain " + timeDomain);
+ }
+ }
+
+ // A list of timer keys that were modified by user processing earlier in this bundle. This
+ // serves a tombstone, so
+ // that we know not to fire any bundle tiemrs that were moddified.
+ private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
+
+ private void onUserTimerModified(TimerData timerData) {
+ if (!timerData.getDeleted()) {
+ getModifiedUserTimersOrdered(timerData.getDomain()).add(timerData);
+ }
+ modifiedUserTimerKeys.put(
+ WindmillTimerInternals.getTimerDataKey(timerData), timerData.getNamespace(), timerData);
+ }
+
+ private boolean timerModified(TimerData timerData) {
+ String timerKey = WindmillTimerInternals.getTimerDataKey(timerData);
+ @Nullable
+ TimerData updatedTimer = modifiedUserTimerKeys.get(timerKey, timerData.getNamespace());
+ return updatedTimer != null && !updatedTimer.equals(timerData);
+ }
public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
if (cachedFiredUserTimers == null) {
+ // This is the first call to getNextFiredUserTimer in this bundle. Extract any user timers
+ // from the bundle
+ // and cache the list for the rest of this bundle processing.
cachedFiredUserTimers =
- FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isUserTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
- .transform(
- timer ->
- WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
- .iterator();
+ Iterators.peekingIterator(
+ FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+ .filter(
+ timer ->
+ WindmillTimerInternals.isUserTimer(timer)
+ && timer.getStateFamily().equals(stateFamily))
+ .transform(
+ timer ->
+ WindmillTimerInternals.windmillTimerToTimerData(
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+ .iterator());
}
- if (!cachedFiredUserTimers.hasNext()) {
- return null;
+ while (cachedFiredUserTimers.hasNext()) {
+ TimerData nextInBundle = cachedFiredUserTimers.peek();
+ NavigableSet<TimerData> modifiedUserTimersOrdered =
+ getModifiedUserTimersOrdered(nextInBundle.getDomain());
+ // If there is a modified timer that is earlier than the next timer in the bundle, try and
Review comment:
So that was my initial implementation, and it does work. I changed it to this structure for two main reasons
1. This code structure mirrors the one in FnApiDoFnRunner (where timers are presented one at a time) and I preferred using the same algorithm in both places.
2. The other approach added a bit more code complexity (e.g. deciding when a new timer should be inserted into cachedFiredUser Timers - timers far in the future should not be). I found this code to be simpler to follow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890550702
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-884337504
R: @kennknowles
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890415908
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889412539
R: @steveniemitz
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-881575422
R: @lcwik
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889437010
Correct - this is a long-standing bug that has bitten a number of users.
State variables are updated immediately, but timer state only updated
between bundles. This meant that if you tried to keep the timer timestamps
in state, you would sometimes observe inconsistent results depending on how
the runner bundled things. The recent addition of Timer.clear() makes this
even worse, as you could clear a timer and then have it still fire if it
was already in the bundle.
On Thu, Jul 29, 2021 at 1:25 PM Steven Niemitz ***@***.***>
wrote:
> it seems like the intent here is to make it so timers set/cleared in a
> bundle can modify timers that were queued to fire in the same bundle (so
> they no longer fire)? Am I understanding the problem correctly?
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/beam/pull/15123#issuecomment-889434662>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AFAYJVPM23MP3OHPJBITKRTT2G2NLANCNFSM47XSKCSA>
> .
>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-881557003
Run Java_Examples_Dataflow PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890559320
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890283083
Run Java_Examples_Dataflow_Java11 PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] steveniemitz commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
steveniemitz commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890023188
yep, code looked good to me!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890552844
Run Java PreCommi
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890565355
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] steveniemitz commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
steveniemitz commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-889434662
it seems like the intent here is to make it so timers set/cleared in a bundle can modify timers that were queued to fire in the same bundle (so they no longer fire)? Am I understanding the problem correctly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890283391
Run Portable_Python PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890426959
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-885749800
Run Java_Examples_Dataflow PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-886237591
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890283069
Run Java_Examples_Dataflow PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #15123: Ensure timer consistency in runners
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15123:
URL: https://github.com/apache/beam/pull/15123#issuecomment-890403323
Run Java Flink PortableValidatesRunner Streaming
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org