You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/05/10 00:07:15 UTC

[1/3] beam git commit: Add per-runner profile to Java 8 examples

Repository: beam
Updated Branches:
  refs/heads/master c4adbd371 -> b9ea415b6


Add per-runner profile to Java 8 examples


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/038febbd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/038febbd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/038febbd

Branch: refs/heads/master
Commit: 038febbd889cc98c5f6e579b31f84e19ff3981f5
Parents: 95d33c5
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 9 13:02:46 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Tue May 9 17:06:57 2017 -0700

----------------------------------------------------------------------
 examples/java8/pom.xml | 83 +++++++++++++++++++++++++++++----------------
 1 file changed, 54 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/038febbd/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index 5a94bee..3c9ed71 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -41,21 +41,11 @@
 
   <profiles>
     <!--
-      A default profile that includes optional dependencies
-      on all of our runners. This is aimed at making it very
-      easy for users to run examples with any runner without
-      any configuration. It is not needed or desirable when
-      testing the examples with a particular runner.
-
-      This profile can be disabled on the command line
-      by specifying -P !include-runners.
-
-      This profile cannot be lifted to examples-parent because
-      it would be automatically deactivated when the Java 8
-      profile were activated.
+      The direct runner is available by default.
+      You can also include it on the classpath explicitly with -P direct-runner
     -->
     <profile>
-      <id>include-runners</id>
+      <id>direct-runner</id>
       <activation>
         <activeByDefault>true</activeByDefault>
       </activation>
@@ -63,41 +53,72 @@
         <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-direct-java</artifactId>
-          <version>${project.version}</version>
           <scope>runtime</scope>
-          <optional>true</optional>
         </dependency>
+      </dependencies>
+    </profile>
 
+    <!-- Include the Apache Apex runner with -P apex-runner -->
+    <profile>
+      <id>apex-runner</id>
+      <dependencies>
         <dependency>
           <groupId>org.apache.beam</groupId>
-          <artifactId>beam-runners-flink_2.10</artifactId>
-          <version>${project.version}</version>
+          <artifactId>beam-runners-apex</artifactId>
           <scope>runtime</scope>
-          <optional>true</optional>
         </dependency>
+        <!--
+          Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
+          google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
+          can be removed when the project no longer has a dependency on a different httpclient version.
+        -->
+        <dependency>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpclient</artifactId>
+          <version>4.3.5</version>
+          <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>commons-codec</groupId>
+              <artifactId>commons-codec</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+    </profile>
 
+    <!-- Include the Apache Flink runner with -P flink-runner -->
+    <profile>
+      <id>flink-runner</id>
+      <dependencies>
         <dependency>
           <groupId>org.apache.beam</groupId>
-          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
-          <version>${project.version}</version>
+          <artifactId>beam-runners-flink_2.10</artifactId>
           <scope>runtime</scope>
-          <optional>true</optional>
         </dependency>
+      </dependencies>
+    </profile>
 
+    <!-- Include the Apache Spark runner -P spark-runner -->
+    <profile>
+      <id>spark-runner</id>
+      <dependencies>
         <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-spark</artifactId>
-          <version>${project.version}</version>
           <scope>runtime</scope>
-          <optional>true</optional>
         </dependency>
-
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
+          <scope>runtime</scope>
+        </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
           <version>${spark.version}</version>
           <scope>runtime</scope>
-          <optional>true</optional>
           <exclusions>
             <exclusion>
               <groupId>org.slf4j</groupId>
@@ -105,13 +126,17 @@
             </exclusion>
           </exclusions>
         </dependency>
+      </dependencies>
+    </profile>
 
+    <!-- Include the Google Cloud Dataflow runner -P dataflow-runner -->
+    <profile>
+      <id>dataflow-runner</id>
+      <dependencies>
         <dependency>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-streaming_2.10</artifactId>
-          <version>${spark.version}</version>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
           <scope>runtime</scope>
-          <optional>true</optional>
         </dependency>
       </dependencies>
     </profile>


[2/3] beam git commit: Register TestSparkPipelineOptions only in src/test to avoid hard hamcrest dep

Posted by da...@apache.org.
Register TestSparkPipelineOptions only in src/test to avoid hard hamcrest dep


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95d33c52
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95d33c52
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95d33c52

Branch: refs/heads/master
Commit: 95d33c521788ce8046201a6baf22e46950560cf1
Parents: c4adbd3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 9 12:44:19 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Tue May 9 17:06:57 2017 -0700

----------------------------------------------------------------------
 .../runners/spark/SparkRunnerRegistrar.java     |  4 +--
 .../beam/runners/spark/TestSparkRunner.java     | 29 ++++++++--------
 .../runners/spark/SparkRunnerRegistrarTest.java |  2 +-
 .../TestSparkPipelineOptionsRegistrar.java      | 36 ++++++++++++++++++++
 4 files changed, 52 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index e2e5ceb..325c86d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -54,9 +54,7 @@ public final class SparkRunnerRegistrar {
   public static class Options implements PipelineOptionsRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(
-          SparkPipelineOptions.class,
-          TestSparkPipelineOptions.class);
+      return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 6d10b75..eccee57 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -83,26 +83,25 @@ import org.slf4j.LoggerFactory;
 public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestSparkRunner.class);
-  private final TestSparkPipelineOptions testSparkPipelineOptions;
-
+  private final PipelineOptions options;
   private SparkRunner delegate;
-  private boolean isForceStreaming;
 
-  private TestSparkRunner(TestSparkPipelineOptions options) {
+  private TestSparkRunner(PipelineOptions options) {
     this.delegate = SparkRunner.fromOptions(options);
-    this.isForceStreaming = options.isForceStreaming();
-    this.testSparkPipelineOptions = options;
+    this.options = options;
   }
 
   public static TestSparkRunner fromOptions(PipelineOptions options) {
-    // Default options suffice to set it up as a test runner
-    TestSparkPipelineOptions sparkOptions =
-        PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options);
-    return new TestSparkRunner(sparkOptions);
+    return new TestSparkRunner(options);
   }
 
   @Override
   public SparkPipelineResult run(Pipeline pipeline) {
+    // Default options suffice to set it up as a test runner
+    TestSparkPipelineOptions testSparkOptions =
+        PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options);
+
+    boolean isForceStreaming = testSparkOptions.isForceStreaming();
     // if the pipeline forces execution as a streaming pipeline,
     // and the source is an adapted unbounded source (as bounded),
     // read it as unbounded source via UnboundedReadFromBoundedSource.
@@ -116,13 +115,13 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
     MetricsAccumulator.clear();
     GlobalWatermarkHolder.clear();
 
-    LOG.info("About to run test pipeline " + testSparkPipelineOptions.getJobName());
+    LOG.info("About to run test pipeline " + options.getJobName());
 
     // if the pipeline was executed in streaming mode, validate aggregators.
     if (isForceStreaming) {
       try {
         result = delegate.run(pipeline);
-        awaitWatermarksOrTimeout(testSparkPipelineOptions, result);
+        awaitWatermarksOrTimeout(testSparkOptions, result);
         result.stop();
         PipelineResult.State finishState = result.getState();
         // assert finish state.
@@ -133,7 +132,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
       } finally {
         try {
           // cleanup checkpoint dir.
-          FileUtils.deleteDirectory(new File(testSparkPipelineOptions.getCheckpointDir()));
+          FileUtils.deleteDirectory(new File(testSparkOptions.getCheckpointDir()));
         } catch (IOException e) {
           throw new RuntimeException("Failed to clear checkpoint tmp dir.", e);
         }
@@ -150,8 +149,8 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
           finishState,
           is(PipelineResult.State.DONE));
       // assert via matchers.
-      assertThat(result, testSparkPipelineOptions.getOnCreateMatcher());
-      assertThat(result, testSparkPipelineOptions.getOnSuccessMatcher());
+      assertThat(result, testSparkOptions.getOnCreateMatcher());
+      assertThat(result, testSparkOptions.getOnSuccessMatcher());
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 75899f9..4e1fd7c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -38,7 +38,7 @@ public class SparkRunnerRegistrarTest {
   @Test
   public void testOptions() {
     assertEquals(
-        ImmutableList.of(SparkPipelineOptions.class, TestSparkPipelineOptions.class),
+        ImmutableList.of(SparkPipelineOptions.class),
         new SparkRunnerRegistrar.Options().getPipelineOptions());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..e71880b
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * A registrar for {@link TestSparkPipelineOptions} to temporarily work around some complexities in
+ * {@link PipelineOptions} parsing.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public final class TestSparkPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.<Class<? extends PipelineOptions>>of(TestSparkPipelineOptions.class);
+  }
+}


[3/3] beam git commit: This closes #3002

Posted by da...@apache.org.
This closes #3002


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b9ea415b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b9ea415b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b9ea415b

Branch: refs/heads/master
Commit: b9ea415b666a1ffb5fb44a2b05ec2a028401d2fa
Parents: c4adbd3 038febb
Author: Davor Bonaci <da...@google.com>
Authored: Tue May 9 17:07:06 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Tue May 9 17:07:06 2017 -0700

----------------------------------------------------------------------
 examples/java8/pom.xml                          | 83 +++++++++++++-------
 .../runners/spark/SparkRunnerRegistrar.java     |  4 +-
 .../beam/runners/spark/TestSparkRunner.java     | 29 ++++---
 .../runners/spark/SparkRunnerRegistrarTest.java |  2 +-
 .../TestSparkPipelineOptionsRegistrar.java      | 36 +++++++++
 5 files changed, 106 insertions(+), 48 deletions(-)
----------------------------------------------------------------------