You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/11/21 17:59:25 UTC
[1/3] beam git commit: Minor cleanup.
Repository: beam
Updated Branches:
refs/heads/master ed40093d8 -> fb41b2950
Minor cleanup.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b8abfba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b8abfba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b8abfba
Branch: refs/heads/master
Commit: 1b8abfba39e1c820255dd4436d14e4057407f738
Parents: 9bd5e6f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Nov 20 12:34:40 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 09:57:29 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/runners/PipelineRunnerTest.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1b8abfba/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
index e7cca28..94c77ae 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
@@ -77,7 +77,7 @@ public class PipelineRunnerTest {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category(NeedsRunner.class)
public void testRunPTransform() {
final String namespace = PipelineRunnerTest.class.getName();
final Counter counter = Metrics.counter(namespace, "count");
@@ -94,6 +94,7 @@ public class PipelineRunnerTest {
}
);
+ // Checking counters to verify the pipeline actually ran.
assertThat(
result.metrics().queryMetrics(
MetricsFilter.builder()
[3/3] beam git commit: Closes #4123
Posted by ro...@apache.org.
Closes #4123
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb41b295
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb41b295
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb41b295
Branch: refs/heads/master
Commit: fb41b2950fa2f59f5ff7496af6990c66a715910e
Parents: ed40093 1b8abfb
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 21 09:57:42 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 09:57:42 2017 -0800
----------------------------------------------------------------------
.../direct/DirectRunnerApiSurfaceTest.java | 3 +-
.../org/apache/beam/sdk/PipelineRunner.java | 28 +++++++++
.../beam/sdk/runners/PipelineRunnerTest.java | 65 ++++++++++++++++++++
3 files changed, 95 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[2/3] beam git commit: Add option to run PTransform steps from
PipelineRunner
Posted by ro...@apache.org.
Add option to run PTransform steps from PipelineRunner
Added PipelineRunner.run(PTransform) method.
Added PipelineTest.testRunPTransform() test case.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9bd5e6f9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9bd5e6f9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9bd5e6f9
Branch: refs/heads/master
Commit: 9bd5e6f9f61b75401961edacf9638bd7d200b4e6
Parents: ed40093
Author: Brian Foo <bf...@bfoo-macbookpro.roam.corp.google.com>
Authored: Thu Nov 9 19:57:19 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 09:57:29 2017 -0800
----------------------------------------------------------------------
.../direct/DirectRunnerApiSurfaceTest.java | 3 +-
.../org/apache/beam/sdk/PipelineRunner.java | 28 +++++++++
.../beam/sdk/runners/PipelineRunnerTest.java | 64 ++++++++++++++++++++
3 files changed, 94 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9bd5e6f9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
index f116709..cebd08a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerApiSurfaceTest.java
@@ -45,7 +45,8 @@ public class DirectRunnerApiSurfaceTest {
ImmutableSet.of(
"org.apache.beam.sdk",
"org.apache.beam.runners.direct",
- "org.joda.time");
+ "org.joda.time",
+ "javax.annotation");
final Package thisPackage = getClass().getPackage();
final ClassLoader thisClassLoader = getClass().getClassLoader();
http://git-wip-us.apache.org/repos/asf/beam/blob/9bd5e6f9/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
index c114501..5acf1f0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
@@ -21,8 +21,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PBegin;
/**
* A {@link PipelineRunner} runs a {@link Pipeline}.
@@ -54,8 +57,33 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
}
/**
+ * Creates a runner from the default app {@link PipelineOptions}.
+ *
+ * @return The newly created runner.
+ */
+ public static PipelineRunner<? extends PipelineResult> create() {
+ return fromOptions(PipelineOptionsFactory.create());
+ }
+
+ /**
* Processes the given {@link Pipeline}, potentially asynchronously, returning a runner-specific
* type of result.
*/
public abstract ResultT run(Pipeline pipeline);
+
+ /**
+ * Creates a {@link Pipeline} out of a single {@link PTransform} step, and executes it.
+ */
+ public ResultT run(PTransform<PBegin, ?> pTransform, PipelineOptions options) {
+ Pipeline p = Pipeline.create(options);
+ p.apply(pTransform);
+ return run(p);
+ }
+
+ /**
+ * Overloaded {@link PTransform} runner that runs with the default app {@link PipelineOptions}.
+ */
+ public ResultT run(PTransform<PBegin, ?> pTransform) {
+ return run(pTransform, PipelineOptionsFactory.create());
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9bd5e6f9/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
index c5d7fbf..e7cca28 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
@@ -17,13 +17,31 @@
*/
package org.apache.beam.sdk.runners;
+import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -39,4 +57,50 @@ public class PipelineRunnerTest {
PipelineRunner<?> runner = PipelineRunner.fromOptions(options);
assertTrue(runner instanceof CrashingRunner);
}
+
+ private static class ScaleFn<T extends Number>
+ extends SimpleFunction<T, Double> {
+
+ private final double scalar;
+ private final Counter counter;
+
+ public ScaleFn(double scalar, Counter counter) {
+ this.scalar = scalar;
+ this.counter = counter;
+ }
+
+ @Override
+ public Double apply(T input) {
+ counter.inc();
+ return scalar * input.doubleValue();
+ }
+ }
+
+ @Test
+ @Category(ValidatesRunner.class)
+ public void testRunPTransform() {
+ final String namespace = PipelineRunnerTest.class.getName();
+ final Counter counter = Metrics.counter(namespace, "count");
+ final PipelineResult result = PipelineRunner.create().run(
+ new PTransform<PBegin, POutput>() {
+ @Override
+ public POutput expand(PBegin input) {
+ PCollection<Double> output = input
+ .apply(Create.<Integer>of(1, 2, 3, 4))
+ .apply("ScaleByTwo", MapElements.via(new ScaleFn<Integer>(2.0, counter)));
+ PAssert.that(output).containsInAnyOrder(2.0, 4.0, 6.0, 8.0);
+ return output;
+ }
+ }
+ );
+
+ assertThat(
+ result.metrics().queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(MetricNameFilter.inNamespace(namespace))
+ .build()
+ ).counters(),
+ hasItem(metricsResult(namespace, "count", "ScaleByTwo", 4L, true))
+ );
+ }
}