You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/03 20:46:37 UTC
[1/2] incubator-beam git commit: This closes #1154
Repository: incubator-beam
Updated Branches:
refs/heads/master 55d951987 -> eac68cb21
This closes #1154
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eac68cb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eac68cb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eac68cb2
Branch: refs/heads/master
Commit: eac68cb217bb635649f900f57333f7fab40e77a7
Parents: 55d9519 0cb3832
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 3 13:45:34 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 3 13:45:34 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/EvaluationContext.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Move watermark updates to the end of
handleResult
Posted by tg...@apache.org.
Move watermark updates to the end of handleResult
This ensures that any state modifications are visible before watermarks
advancement permits additional progress.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0cb3832e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0cb3832e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0cb3832e
Branch: refs/heads/master
Commit: 0cb3832eb21ffeebc33b433d57cb814957788037
Parents: 55d9519
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 21 10:13:06 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 3 13:45:34 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/EvaluationContext.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0cb3832e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index e5a30d4..965e77d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -179,11 +179,6 @@ class EvaluationContext {
: completedBundle.withElements((Iterable) result.getUnprocessedElements()),
committedBundles,
outputTypes);
- watermarkManager.updateWatermarks(
- completedBundle,
- result.getTimerUpdate().withCompletedTimers(completedTimers),
- committedResult,
- result.getWatermarkHold());
// Commit aggregator changes
if (result.getAggregatorChanges() != null) {
result.getAggregatorChanges().commit();
@@ -201,6 +196,13 @@ class EvaluationContext {
applicationStateInternals.remove(stepAndKey);
}
}
+ // Watermarks are updated last to ensure visibility of any global state before progress is
+ // permitted
+ watermarkManager.updateWatermarks(
+ completedBundle,
+ result.getTimerUpdate().withCompletedTimers(completedTimers),
+ committedResult,
+ result.getWatermarkHold());
return committedResult;
}