You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/03 20:46:38 UTC

[2/2] incubator-beam git commit: Move watermark updates to the end of handleResult

Move watermark updates to the end of handleResult

This ensures that any state modifications are visible before watermarks
advancement permits additional progress.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0cb3832e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0cb3832e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0cb3832e

Branch: refs/heads/master
Commit: 0cb3832eb21ffeebc33b433d57cb814957788037
Parents: 55d9519
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 21 10:13:06 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 3 13:45:34 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/EvaluationContext.java   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0cb3832e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index e5a30d4..965e77d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -179,11 +179,6 @@ class EvaluationContext {
             : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
         committedBundles,
         outputTypes);
-    watermarkManager.updateWatermarks(
-        completedBundle,
-        result.getTimerUpdate().withCompletedTimers(completedTimers),
-        committedResult,
-        result.getWatermarkHold());
     // Commit aggregator changes
     if (result.getAggregatorChanges() != null) {
       result.getAggregatorChanges().commit();
@@ -201,6 +196,13 @@ class EvaluationContext {
         applicationStateInternals.remove(stepAndKey);
       }
     }
+    // Watermarks are updated last to ensure visibility of any global state before progress is
+    // permitted
+    watermarkManager.updateWatermarks(
+        completedBundle,
+        result.getTimerUpdate().withCompletedTimers(completedTimers),
+        committedResult,
+        result.getWatermarkHold());
     return committedResult;
   }