You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/05 21:35:01 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #11924: [BEAM-8543] Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle

kennknowles commented on a change in pull request #11924:
URL: https://github.com/apache/beam/pull/11924#discussion_r436173779



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -3938,10 +3939,11 @@ public void testEventTimeTimerOrdering() throws Exception {
       ValidatesRunner.class,
       UsesTimersInParDo.class,
       UsesStatefulParDo.class,
+      UsesUnboundedPCollections.class,
       UsesStrictTimerOrdering.class
     })
     public void testEventTimeTimerOrderingWithCreate() throws Exception {
-      final int numTestElements = 100;
+      final int numTestElements = 5;

Review comment:
       Why shrink it? Does the test get really slow? Is this going to be a perf problem overall?

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -577,12 +583,21 @@ public void flushState() {
                         WindmillTimerInternals.windmillTimerToTimerData(
                             WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
                 .iterator();
+
+        cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add);
+      }
+
+      Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime();
+      if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
+        while (!toBeFiredTimersOrdered.isEmpty()) {
+          userTimerInternals.setTimer(toBeFiredTimersOrdered.poll());
+        }
       }

Review comment:
       Yea I don't actually understand what this block is for.
   
   FWIW to do timer deletion/reset cheaply without building a bespoke data structure just keep a map from id to firing time or tombstone. This way, whenever a timer comes up in the prio queue you pull out the actual time for it from the map. If it is actually set for another time, don't fire it. If it is obsolete, don't fire it.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -577,12 +583,21 @@ public void flushState() {
                         WindmillTimerInternals.windmillTimerToTimerData(
                             WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
                 .iterator();
+
+        cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add);

Review comment:
       Do we even need `cachedFiredUserTimers`? It seems obsolete if we populate the priority queue. The name is also wrong - even before this PR it wasn't a cache. It is a lazily initialized iterator. Instead, we should have a lazily initialized priority queue (like you do) and just a flag to say whether the incoming timers have been loaded yet.

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4040,7 +4043,8 @@ public void onTimer(
             }
           };
 
-      PCollection<String> output = pipeline.apply(transform).apply(ParDo.of(fn));
+      PCollection<String> output =
+          pipeline.apply(transform).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(ParDo.of(fn));

Review comment:
       Should not be calling `setIsBoundedInternal` here. Is this just to force streaming mode? We need to just create a separate run of ValidatesRunner that forces streaming mode.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org