You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:53:03 UTC
[45/51] [abbrv] incubator-beam git commit: Remove References to
Instant#now in the DirectRunner
Remove References to Instant#now in the DirectRunner
The DirectRunner should use exclusively the configured clock to
determine the processing time.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7585cfc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7585cfc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7585cfc3
Branch: refs/heads/python-sdk
Commit: 7585cfc3693800b00c4ccc799c27f0311e9b0cc1
Parents: fcf6b1d
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 09:58:05 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/EvaluationContext.java | 14 ++++++++++----
.../direct/ExecutorServiceParallelExecutor.java | 5 ++---
2 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 23c139d..94f28e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -48,6 +48,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
+import org.joda.time.Instant;
+
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@@ -81,6 +83,7 @@ class EvaluationContext {
/** The options that were used to create this {@link Pipeline}. */
private final DirectOptions options;
+ private final Clock clock;
private final BundleFactory bundleFactory;
/** The current processing time and event time watermarks and timers. */
@@ -116,6 +119,7 @@ class EvaluationContext {
Map<AppliedPTransform<?, ?, ?>, String> stepNames,
Collection<PCollectionView<?>> views) {
this.options = checkNotNull(options);
+ this.clock = options.getClock();
this.bundleFactory = checkNotNull(bundleFactory);
checkNotNull(rootTransforms);
checkNotNull(valueToConsumers);
@@ -123,9 +127,7 @@ class EvaluationContext {
checkNotNull(views);
this.stepNames = stepNames;
- this.watermarkManager =
- WatermarkManager.create(
- NanosOffsetClock.create(), rootTransforms, valueToConsumers);
+ this.watermarkManager = WatermarkManager.create(clock, rootTransforms, valueToConsumers);
this.sideInputContainer = SideInputContainer.create(this, views);
this.applicationStateInternals = new ConcurrentHashMap<>();
@@ -314,7 +316,7 @@ class EvaluationContext {
AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {
StepAndKey stepAndKey = StepAndKey.of(application, key);
return new DirectExecutionContext(
- options.getClock(),
+ clock,
key,
(CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
watermarkManager.getWatermarks(application));
@@ -427,4 +429,8 @@ class EvaluationContext {
}
return true;
}
+
+ public Instant now() {
+ return clock.now();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 64836d8..a0a5ec0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -40,7 +40,6 @@ import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -433,9 +432,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
.createKeyedBundle(
null, keyTimers.getKey(), (PCollection) transform.getInput())
.add(WindowedValue.valueInEmptyWindows(work))
- .commit(Instant.now());
- state.set(ExecutorState.ACTIVE);
+ .commit(evaluationContext.now());
scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery));
+ state.set(ExecutorState.ACTIVE);
}
}
}