You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/03 20:46:37 UTC

[1/2] incubator-beam git commit: This closes #1154

Repository: incubator-beam
Updated Branches:
  refs/heads/master 55d951987 -> eac68cb21


This closes #1154


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eac68cb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eac68cb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eac68cb2

Branch: refs/heads/master
Commit: eac68cb217bb635649f900f57333f7fab40e77a7
Parents: 55d9519 0cb3832
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 3 13:45:34 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 3 13:45:34 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/EvaluationContext.java   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Move watermark updates to the end of handleResult

Posted by tg...@apache.org.
Move watermark updates to the end of handleResult

This ensures that any state modifications are visible before watermarks
advancement permits additional progress.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0cb3832e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0cb3832e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0cb3832e

Branch: refs/heads/master
Commit: 0cb3832eb21ffeebc33b433d57cb814957788037
Parents: 55d9519
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 21 10:13:06 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 3 13:45:34 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/EvaluationContext.java   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0cb3832e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index e5a30d4..965e77d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -179,11 +179,6 @@ class EvaluationContext {
             : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
         committedBundles,
         outputTypes);
-    watermarkManager.updateWatermarks(
-        completedBundle,
-        result.getTimerUpdate().withCompletedTimers(completedTimers),
-        committedResult,
-        result.getWatermarkHold());
     // Commit aggregator changes
     if (result.getAggregatorChanges() != null) {
       result.getAggregatorChanges().commit();
@@ -201,6 +196,13 @@ class EvaluationContext {
         applicationStateInternals.remove(stepAndKey);
       }
     }
+    // Watermarks are updated last to ensure visibility of any global state before progress is
+    // permitted
+    watermarkManager.updateWatermarks(
+        completedBundle,
+        result.getTimerUpdate().withCompletedTimers(completedTimers),
+        committedResult,
+        result.getWatermarkHold());
     return committedResult;
   }