You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/05 17:40:30 UTC
[1/4] incubator-beam git commit: Remove unneccssary Assignment in
TransformExecutor
Repository: incubator-beam
Updated Branches:
refs/heads/master 2b5c6bcb2 -> c584b37b8
Remove unneccssary Assignment in TransformExecutor
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/acf71d31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/acf71d31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/acf71d31
Branch: refs/heads/master
Commit: acf71d313a43c8f39213ce19277ffebadcc40a77
Parents: 4546fd9
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 09:59:46 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/runners/direct/TransformExecutor.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/acf71d31/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 3db941d..d873bf5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -118,7 +118,7 @@ class TransformExecutor<T> implements Runnable {
processElements(evaluator, enforcements);
- TransformResult result = finishBundle(evaluator, enforcements);
+ finishBundle(evaluator, enforcements);
} catch (Throwable t) {
onComplete.handleThrowable(inputBundle, t);
if (t instanceof RuntimeException) {
[4/4] incubator-beam git commit: This closes #795
Posted by ke...@apache.org.
This closes #795
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c584b37b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c584b37b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c584b37b
Branch: refs/heads/master
Commit: c584b37b8ac4e863bad83a766b6871ccc7135334
Parents: 2b5c6bc acf71d3
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 10:40:17 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Aug 5 10:40:17 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/EvaluationContext.java | 14 ++++++---
.../direct/ExecutorServiceParallelExecutor.java | 5 ++--
.../beam/runners/direct/TransformExecutor.java | 2 +-
.../beam/runners/direct/DirectRunnerTest.java | 31 ++++++++++++++++++++
4 files changed, 44 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-beam git commit: Remove References to Instant#now in
the DirectRunner
Posted by ke...@apache.org.
Remove References to Instant#now in the DirectRunner
The DirectRunner should use exclusively the configured clock to
determine the processing time.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7585cfc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7585cfc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7585cfc3
Branch: refs/heads/master
Commit: 7585cfc3693800b00c4ccc799c27f0311e9b0cc1
Parents: fcf6b1d
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 09:58:05 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/EvaluationContext.java | 14 ++++++++++----
.../direct/ExecutorServiceParallelExecutor.java | 5 ++---
2 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 23c139d..94f28e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -48,6 +48,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
+import org.joda.time.Instant;
+
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@@ -81,6 +83,7 @@ class EvaluationContext {
/** The options that were used to create this {@link Pipeline}. */
private final DirectOptions options;
+ private final Clock clock;
private final BundleFactory bundleFactory;
/** The current processing time and event time watermarks and timers. */
@@ -116,6 +119,7 @@ class EvaluationContext {
Map<AppliedPTransform<?, ?, ?>, String> stepNames,
Collection<PCollectionView<?>> views) {
this.options = checkNotNull(options);
+ this.clock = options.getClock();
this.bundleFactory = checkNotNull(bundleFactory);
checkNotNull(rootTransforms);
checkNotNull(valueToConsumers);
@@ -123,9 +127,7 @@ class EvaluationContext {
checkNotNull(views);
this.stepNames = stepNames;
- this.watermarkManager =
- WatermarkManager.create(
- NanosOffsetClock.create(), rootTransforms, valueToConsumers);
+ this.watermarkManager = WatermarkManager.create(clock, rootTransforms, valueToConsumers);
this.sideInputContainer = SideInputContainer.create(this, views);
this.applicationStateInternals = new ConcurrentHashMap<>();
@@ -314,7 +316,7 @@ class EvaluationContext {
AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {
StepAndKey stepAndKey = StepAndKey.of(application, key);
return new DirectExecutionContext(
- options.getClock(),
+ clock,
key,
(CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
watermarkManager.getWatermarks(application));
@@ -427,4 +429,8 @@ class EvaluationContext {
}
return true;
}
+
+ public Instant now() {
+ return clock.now();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 64836d8..a0a5ec0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -40,7 +40,6 @@ import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -433,9 +432,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
.createKeyedBundle(
null, keyTimers.getKey(), (PCollection) transform.getInput())
.add(WindowedValue.valueInEmptyWindows(work))
- .commit(Instant.now());
- state.set(ExecutorState.ACTIVE);
+ .commit(evaluationContext.now());
scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery));
+ state.set(ExecutorState.ACTIVE);
}
}
}
[2/4] incubator-beam git commit: Add DirectRunner Reuse Test
Posted by ke...@apache.org.
Add DirectRunner Reuse Test
Two calls to run using the Direct Runner should be independent and
succeed independently.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4546fd9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4546fd9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4546fd9c
Branch: refs/heads/master
Commit: 4546fd9c5e073eb33787faa302b8695dfd6e04aa
Parents: 7585cfc
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 09:58:59 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunnerTest.java | 31 ++++++++++++++++++++
1 file changed, 31 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4546fd9c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 29dea32..1e73ec0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -109,6 +109,37 @@ public class DirectRunnerTest implements Serializable {
result.awaitCompletion();
}
+ @Test
+ public void reusePipelineSucceeds() throws Throwable {
+ Pipeline p = getPipeline();
+
+ PCollection<KV<String, Long>> counts =
+ p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
+ .apply(MapElements.via(new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input;
+ }
+ }))
+ .apply(Count.<String>perElement());
+ PCollection<String> countStrs =
+ counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
+ @Override
+ public String apply(KV<String, Long> input) {
+ String str = String.format("%s: %s", input.getKey(), input.getValue());
+ return str;
+ }
+ }));
+
+ PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
+
+ DirectPipelineResult result = ((DirectPipelineResult) p.run());
+ result.awaitCompletion();
+
+ DirectPipelineResult otherResult = ((DirectPipelineResult) p.run());
+ otherResult.awaitCompletion();
+ }
+
@Test(timeout = 5000L)
public void byteArrayCountShouldSucceed() {
Pipeline p = getPipeline();