You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/04 07:55:29 UTC

[1/3] beam git commit: Test waitUntilFinish(Duration) in the DirectRunner

Repository: beam
Updated Branches:
  refs/heads/master de36e8398 -> cc8e0b9df


Test waitUntilFinish(Duration) in the DirectRunner

Ensures that the call to "waitUntilFinish(Duration)" terminates before
the Pipeline completes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b4541a18
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b4541a18
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b4541a18

Branch: refs/heads/master
Commit: b4541a18cf447fed2b2150a99be1d892e1f8e358
Parents: de36e83
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 3 10:12:58 2017 -0700
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 09:45:19 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunnerTest.java   | 27 ++++++++++++++++++++
 1 file changed, 27 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b4541a18/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index e601fcf..f1c0eb2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -277,6 +278,32 @@ public class DirectRunnerTest implements Serializable {
   }
 
   @Test
+  public void waitUntilFinishTimeout() throws Exception {
+    DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
+    options.setBlockOnRun(false);
+    options.setRunner(DirectRunner.class);
+    Pipeline p = Pipeline.create(options);
+    p.apply(Create.of(1L))
+        .apply(
+            ParDo.of(
+                new DoFn<Long, Long>() {
+                  @ProcessElement
+                  public void hang(ProcessContext context) throws InterruptedException {
+                    // Hangs "forever"
+                    Thread.sleep(Long.MAX_VALUE);
+                  }
+                }));
+    PipelineResult result = p.run();
+    // The pipeline should never complete;
+    assertThat(result.getState(), is(State.RUNNING));
+    // Must time out, otherwise this test will never complete
+    result.waitUntilFinish(Duration.millis(1L));
+    assertThat(result.getState(), is(State.RUNNING));
+
+    result.cancel();
+  }
+
+  @Test
   public void transformDisplayDataExceptionShouldFail() {
     DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
       @ProcessElement


[3/3] beam git commit: This closes #2408

Posted by ie...@apache.org.
This closes #2408


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cc8e0b9d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cc8e0b9d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cc8e0b9d

Branch: refs/heads/master
Commit: cc8e0b9df1b54f1776fea6b7b593782b7e515ae5
Parents: de36e83 866e08f
Author: Isma�l Mej�a <ie...@apache.org>
Authored: Tue Apr 4 09:53:45 2017 +0200
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 09:53:45 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunnerTest.java   | 25 ++++++++++++++++++++
 1 file changed, 25 insertions(+)
----------------------------------------------------------------------



[2/3] beam git commit: Fix style on waitUntilFinish(Duration) test for the DirectRunner The call to .cancel() is removed too, it is apparently unneeded.

Posted by ie...@apache.org.
Fix style on waitUntilFinish(Duration) test for the DirectRunner
The call to .cancel() is removed too, it is apparently unneeded.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/866e08f4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/866e08f4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/866e08f4

Branch: refs/heads/master
Commit: 866e08f4a11ccc17fc785817582e51f7982b8a7d
Parents: b4541a1
Author: Isma�l Mej�a <ie...@apache.org>
Authored: Tue Apr 4 09:51:58 2017 +0200
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 09:51:58 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunnerTest.java   | 24 +++++++++-----------
 1 file changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/866e08f4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index f1c0eb2..28c24ad 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -278,29 +278,27 @@ public class DirectRunnerTest implements Serializable {
   }
 
   @Test
-  public void waitUntilFinishTimeout() throws Exception {
+  public void testWaitUntilFinishTimeout() throws Exception {
     DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
     options.setBlockOnRun(false);
     options.setRunner(DirectRunner.class);
     Pipeline p = Pipeline.create(options);
-    p.apply(Create.of(1L))
-        .apply(
-            ParDo.of(
-                new DoFn<Long, Long>() {
-                  @ProcessElement
-                  public void hang(ProcessContext context) throws InterruptedException {
-                    // Hangs "forever"
-                    Thread.sleep(Long.MAX_VALUE);
-                  }
-                }));
+    p
+      .apply(Create.of(1L))
+      .apply(ParDo.of(
+          new DoFn<Long, Long>() {
+            @ProcessElement
+            public void hang(ProcessContext context) throws InterruptedException {
+              // Hangs "forever"
+              Thread.sleep(Long.MAX_VALUE);
+            }
+          }));
     PipelineResult result = p.run();
     // The pipeline should never complete;
     assertThat(result.getState(), is(State.RUNNING));
     // Must time out, otherwise this test will never complete
     result.waitUntilFinish(Duration.millis(1L));
     assertThat(result.getState(), is(State.RUNNING));
-
-    result.cancel();
   }
 
   @Test