You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:41 UTC
[48/50] [abbrv] incubator-beam git commit: Configure
RunnableOnService tests for Spark runner, batch mode
Configure RunnableOnService tests for Spark runner, batch mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30d226a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30d226a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30d226a3
Branch: refs/heads/python-sdk
Commit: 30d226a3ae547c4a2d890d1d42487862323a4ae3
Parents: a24e557
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 5 15:11:07 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:31 2016 -0700
----------------------------------------------------------------------
runners/spark/pom.xml | 112 +++++++++++++------
.../runners/spark/SparkRunnerRegistrar.java | 3 +-
.../runners/spark/TestSparkPipelineRunner.java | 77 +++++++++++++
.../runners/spark/SparkRunnerRegistrarTest.java | 2 +-
4 files changed, 155 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30d226a3/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e7d0834..747464e 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -37,6 +37,62 @@
<spark.version>1.6.1</spark.version>
</properties>
+ <profiles>
+ <profile>
+ <id>jacoco</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
+ <!-- This profile adds execution of RunnableOnService integration tests
+ against a local Spark endpoint. -->
+ <id>runnable-on-service-tests</id>
+ <activation><activeByDefault>false</activeByDefault></activation>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>runnable-on-service-tests</id>
+ <configuration>
+ <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <parallel>none</parallel>
+ <failIfNoTests>true</failIfNoTests>
+ <dependenciesToScan>
+ <dependency>org.apache.beam:java-sdk-all</dependency>
+ </dependenciesToScan>
+ <excludes>
+ org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest
+ </excludes>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--runner=org.apache.beam.runners.spark.TestSparkPipelineRunner",
+ "--streaming=false"
+ ]
+ </beamTestPipelineOptions>
+ <dataflow.spark.test.reuseSparkContext>true</dataflow.spark.test.reuseSparkContext>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+ </profile>
+ </profiles>
+
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -122,6 +178,25 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
+ </dependency>
+
+ <!-- Depend on test jar to scan for RunnableOnService tests -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
@@ -237,41 +312,4 @@
</plugins>
</build>
- <profiles>
- <profile>
- <id>jacoco</id>
- <build>
- <plugins>
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </profile>
-
- <profile>
- <id>disable-runnable-on-service-tests</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <executions>
- <execution>
- <id>runnable-on-service-tests</id>
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30d226a3/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 9537ec6..baa2241 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -43,7 +43,8 @@ public final class SparkRunnerRegistrar {
public static class Runner implements PipelineRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(SparkPipelineRunner.class);
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+ SparkPipelineRunner.class, TestSparkPipelineRunner.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30d226a3/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
new file mode 100644
index 0000000..d11d1c1
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * The SparkPipelineRunner translate operations defined on a pipeline to a representation executable
+ * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow
+ * pipeline with the default options of a single threaded spark instance in local mode, we would do
+ * the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * EvaluationResult result = SparkPipelineRunner.create().run(p);
+ * }
+ *
+ * To create a pipeline runner to run against a different spark cluster, with a custom master url we
+ * would do the following:
+ *
+ * {@code
+ * Pipeline p = [logic for pipeline creation]
+ * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+ * options.setSparkMaster("spark://host:port");
+ * EvaluationResult result = SparkPipelineRunner.create(options).run(p);
+ * }
+ *
+ * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
+ */
+public final class TestSparkPipelineRunner extends PipelineRunner<EvaluationResult> {
+
+ private SparkPipelineRunner delegate;
+
+ private TestSparkPipelineRunner(SparkPipelineOptions options) {
+ this.delegate = SparkPipelineRunner.fromOptions(options);
+ }
+
+ public static TestSparkPipelineRunner fromOptions(PipelineOptions options) {
+ // Default options suffice to set it up as a test runner
+ SparkPipelineOptions sparkOptions =
+ PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+ return new TestSparkPipelineRunner(sparkOptions);
+ }
+
+ @Override
+ public <OutputT extends POutput, InputT extends PInput>
+ OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
+ return delegate.apply(transform, input);
+ };
+
+ @Override
+ public EvaluationResult run(Pipeline pipeline) {
+ return delegate.run(pipeline);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30d226a3/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 88f4a06..d2e57aa 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -47,7 +47,7 @@ public class SparkRunnerRegistrarTest {
@Test
public void testRunners() {
- assertEquals(ImmutableList.of(SparkPipelineRunner.class),
+ assertEquals(ImmutableList.of(SparkPipelineRunner.class, TestSparkPipelineRunner.class),
new SparkRunnerRegistrar.Runner().getPipelineRunners());
}