You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/11/21 17:59:25 UTC

[1/3] beam git commit: Minor cleanup.

Repository: beam
Updated Branches:
  refs/heads/master ed40093d8 -> fb41b2950


Minor cleanup.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b8abfba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b8abfba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b8abfba

Branch: refs/heads/master
Commit: 1b8abfba39e1c820255dd4436d14e4057407f738
Parents: 9bd5e6f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Nov 20 12:34:40 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 09:57:29 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/runners/PipelineRunnerTest.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1b8abfba/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
index e7cca28..94c77ae 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -77,7 +77,7 @@ public class PipelineRunnerTest {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category(NeedsRunner.class)
   public void testRunPTransform() {
     final String namespace = PipelineRunnerTest.class.getName();
     final Counter counter = Metrics.counter(namespace, "count");
@@ -94,6 +94,7 @@ public class PipelineRunnerTest {
         }
     );
 
+    // Checking counters to verify the pipeline actually ran.
     assertThat(
         result.metrics().queryMetrics(
             MetricsFilter.builder()


[3/3] beam git commit: Closes #4123

Posted by ro...@apache.org.
Closes #4123


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb41b295
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb41b295
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb41b295

Branch: refs/heads/master
Commit: fb41b2950fa2f59f5ff7496af6990c66a715910e
Parents: ed40093 1b8abfb
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 21 09:57:42 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 09:57:42 2017 -0800

----------------------------------------------------------------------
 .../direct/DirectRunnerApiSurfaceTest.java      |  3 +-
 .../org/apache/beam/sdk/PipelineRunner.java     | 28 +++++++++
 .../beam/sdk/runners/PipelineRunnerTest.java    | 65 ++++++++++++++++++++
 3 files changed, 95 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/3] beam git commit: Add option to run PTransform steps from PipelineRunner

Posted by ro...@apache.org.
Add option to run PTransform steps from PipelineRunner

Added PipelineRunner.run(PTransform) method.
Added PipelineTest.testRunPTransform() test case.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9bd5e6f9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9bd5e6f9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9bd5e6f9

Branch: refs/heads/master
Commit: 9bd5e6f9f61b75401961edacf9638bd7d200b4e6
Parents: ed40093
Author: Brian Foo <bf...@bfoo-macbookpro.roam.corp.google.com>
Authored: Thu Nov 9 19:57:19 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 09:57:29 2017 -0800

----------------------------------------------------------------------
 .../direct/DirectRunnerApiSurfaceTest.java      |  3 +-
 .../org/apache/beam/sdk/PipelineRunner.java     | 28 +++++++++
 .../beam/sdk/runners/PipelineRunnerTest.java    | 64 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9bd5e6f9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
index f116709..cebd08a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
@@ -45,7 +45,8 @@ public class DirectRunnerApiSurfaceTest {
         ImmutableSet.of(
             "org.apache.beam.sdk",
             "org.apache.beam.runners.direct",
-            "org.joda.time");
+            "org.joda.time",
+            "javax.annotation");
 
     final Package thisPackage = getClass().getPackage();
     final ClassLoader thisClassLoader = getClass().getClassLoader();

http://git-wip-us.apache.org/repos/asf/beam/blob/9bd5e6f9/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
index c114501..5acf1f0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
@@ -21,8 +21,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PBegin;
 
 /**
  * A {@link PipelineRunner} runs a {@link Pipeline}.
@@ -54,8 +57,33 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
   }
 
   /**
+   * Creates a runner from the default app {@link PipelineOptions}.
+   *
+   * @return The newly created runner.
+   */
+  public static PipelineRunner<? extends PipelineResult> create() {
+    return fromOptions(PipelineOptionsFactory.create());
+  }
+
+  /**
    * Processes the given {@link Pipeline}, potentially asynchronously, returning a runner-specific
    * type of result.
    */
   public abstract ResultT run(Pipeline pipeline);
+
+  /**
+   * Creates a {@link Pipeline} out of a single {@link PTransform} step, and executes it.
+   */
+  public ResultT run(PTransform<PBegin, ?> pTransform, PipelineOptions options) {
+    Pipeline p = Pipeline.create(options);
+    p.apply(pTransform);
+    return run(p);
+  }
+
+  /**
+   * Overloaded {@link PTransform} runner that runs with the default app {@link PipelineOptions}.
+   */
+  public ResultT run(PTransform<PBegin, ?> pTransform) {
+    return run(pTransform, PipelineOptionsFactory.create());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9bd5e6f9/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
index c5d7fbf..e7cca28 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
@@ -17,13 +17,31 @@
  */
 package org.apache.beam.sdk.runners;
 
+import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -39,4 +57,50 @@ public class PipelineRunnerTest {
     PipelineRunner<?> runner = PipelineRunner.fromOptions(options);
     assertTrue(runner instanceof CrashingRunner);
   }
+
+  private static class ScaleFn<T extends Number>
+      extends SimpleFunction<T, Double> {
+
+    private final double scalar;
+    private final Counter counter;
+
+    public ScaleFn(double scalar, Counter counter) {
+      this.scalar = scalar;
+      this.counter = counter;
+    }
+
+    @Override
+    public Double apply(T input) {
+      counter.inc();
+      return scalar * input.doubleValue();
+    }
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testRunPTransform() {
+    final String namespace = PipelineRunnerTest.class.getName();
+    final Counter counter = Metrics.counter(namespace, "count");
+    final PipelineResult result = PipelineRunner.create().run(
+        new PTransform<PBegin, POutput>() {
+          @Override
+          public POutput expand(PBegin input) {
+            PCollection<Double> output = input
+                .apply(Create.<Integer>of(1, 2, 3, 4))
+                .apply("ScaleByTwo", MapElements.via(new ScaleFn<Integer>(2.0, counter)));
+            PAssert.that(output).containsInAnyOrder(2.0, 4.0, 6.0, 8.0);
+            return output;
+          }
+        }
+    );
+
+    assertThat(
+        result.metrics().queryMetrics(
+            MetricsFilter.builder()
+                .addNameFilter(MetricNameFilter.inNamespace(namespace))
+                .build()
+        ).counters(),
+        hasItem(metricsResult(namespace, "count", "ScaleByTwo", 4L, true))
+    );
+  }
 }