You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/04 07:55:29 UTC
[1/3] beam git commit: Test waitUntilFinish(Duration) in the
DirectRunner
Repository: beam
Updated Branches:
refs/heads/master de36e8398 -> cc8e0b9df
Test waitUntilFinish(Duration) in the DirectRunner
Ensures that the call to "waitUntilFinish(Duration)" terminates before
the Pipeline completes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b4541a18
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b4541a18
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b4541a18
Branch: refs/heads/master
Commit: b4541a18cf447fed2b2150a99be1d892e1f8e358
Parents: de36e83
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 3 10:12:58 2017 -0700
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 09:45:19 2017 +0200
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunnerTest.java | 27 ++++++++++++++++++++
1 file changed, 27 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b4541a18/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index e601fcf..f1c0eb2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -277,6 +278,32 @@ public class DirectRunnerTest implements Serializable {
}
@Test
+ public void waitUntilFinishTimeout() throws Exception {
+ DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
+ options.setBlockOnRun(false);
+ options.setRunner(DirectRunner.class);
+ Pipeline p = Pipeline.create(options);
+ p.apply(Create.of(1L))
+ .apply(
+ ParDo.of(
+ new DoFn<Long, Long>() {
+ @ProcessElement
+ public void hang(ProcessContext context) throws InterruptedException {
+ // Hangs "forever"
+ Thread.sleep(Long.MAX_VALUE);
+ }
+ }));
+ PipelineResult result = p.run();
+ // The pipeline should never complete;
+ assertThat(result.getState(), is(State.RUNNING));
+ // Must time out, otherwise this test will never complete
+ result.waitUntilFinish(Duration.millis(1L));
+ assertThat(result.getState(), is(State.RUNNING));
+
+ result.cancel();
+ }
+
+ @Test
public void transformDisplayDataExceptionShouldFail() {
DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
@ProcessElement
[3/3] beam git commit: This closes #2408
Posted by ie...@apache.org.
This closes #2408
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cc8e0b9d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cc8e0b9d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cc8e0b9d
Branch: refs/heads/master
Commit: cc8e0b9df1b54f1776fea6b7b593782b7e515ae5
Parents: de36e83 866e08f
Author: Isma�l Mej�a <ie...@apache.org>
Authored: Tue Apr 4 09:53:45 2017 +0200
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 09:53:45 2017 +0200
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunnerTest.java | 25 ++++++++++++++++++++
1 file changed, 25 insertions(+)
----------------------------------------------------------------------
[2/3] beam git commit: Fix style on waitUntilFinish(Duration) test
for the DirectRunner The call to .cancel() is removed too,
it is apparently unneeded.
Posted by ie...@apache.org.
Fix style on waitUntilFinish(Duration) test for the DirectRunner
The call to .cancel() is removed too, it is apparently unneeded.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/866e08f4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/866e08f4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/866e08f4
Branch: refs/heads/master
Commit: 866e08f4a11ccc17fc785817582e51f7982b8a7d
Parents: b4541a1
Author: Isma�l Mej�a <ie...@apache.org>
Authored: Tue Apr 4 09:51:58 2017 +0200
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 09:51:58 2017 +0200
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunnerTest.java | 24 +++++++++-----------
1 file changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/866e08f4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index f1c0eb2..28c24ad 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -278,29 +278,27 @@ public class DirectRunnerTest implements Serializable {
}
@Test
- public void waitUntilFinishTimeout() throws Exception {
+ public void testWaitUntilFinishTimeout() throws Exception {
DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
options.setBlockOnRun(false);
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
- p.apply(Create.of(1L))
- .apply(
- ParDo.of(
- new DoFn<Long, Long>() {
- @ProcessElement
- public void hang(ProcessContext context) throws InterruptedException {
- // Hangs "forever"
- Thread.sleep(Long.MAX_VALUE);
- }
- }));
+ p
+ .apply(Create.of(1L))
+ .apply(ParDo.of(
+ new DoFn<Long, Long>() {
+ @ProcessElement
+ public void hang(ProcessContext context) throws InterruptedException {
+ // Hangs "forever"
+ Thread.sleep(Long.MAX_VALUE);
+ }
+ }));
PipelineResult result = p.run();
// The pipeline should never complete;
assertThat(result.getState(), is(State.RUNNING));
// Must time out, otherwise this test will never complete
result.waitUntilFinish(Duration.millis(1L));
assertThat(result.getState(), is(State.RUNNING));
-
- result.cancel();
}
@Test