You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/14 01:08:57 UTC
[1/6] incubator-beam git commit: Remove PubsubFileInjector
Repository: incubator-beam
Updated Branches:
refs/heads/master e3768f6da -> e9a08e454
Remove PubsubFileInjector
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1047e033
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1047e033
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1047e033
Branch: refs/heads/master
Commit: 1047e03383b535124b36ec09f900f4e8e3df3a23
Parents: 7fbe410
Author: Pei He <pe...@google.com>
Authored: Mon Sep 12 16:29:47 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 13 18:01:08 2016 -0700
----------------------------------------------------------------------
.../main/java/common/PubsubFileInjector.java | 153 -------------------
1 file changed, 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1047e033/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
deleted file mode 100644
index 6ca20f3..0000000
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.IntraBundleParallelization;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.Transport;
-
-/**
- * A batch Dataflow pipeline for injecting a set of GCS files into
- * a PubSub topic line by line. Empty lines are skipped.
- *
- * <p>This is useful for testing streaming
- * pipelines. Note that since batch pipelines might retry chunks, this
- * does _not_ guarantee exactly-once injection of file data. Some lines may
- * be published multiple times.
- * </p>
- */
-public class PubsubFileInjector {
-
- /**
- * An incomplete {@code PubsubFileInjector} transform with unbound output topic.
- */
- public static class Unbound {
- private final String timestampLabelKey;
-
- Unbound() {
- this.timestampLabelKey = null;
- }
-
- Unbound(String timestampLabelKey) {
- this.timestampLabelKey = timestampLabelKey;
- }
-
- Unbound withTimestampLabelKey(String timestampLabelKey) {
- return new Unbound(timestampLabelKey);
- }
-
- public Bound publish(String outputTopic) {
- return new Bound(outputTopic, timestampLabelKey);
- }
- }
-
- /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */
- public static class Bound extends OldDoFn<String, Void> {
- private final String outputTopic;
- private final String timestampLabelKey;
- public transient Pubsub pubsub;
-
- public Bound(String outputTopic, String timestampLabelKey) {
- this.outputTopic = outputTopic;
- this.timestampLabelKey = timestampLabelKey;
- }
-
- @Override
- public void startBundle(Context context) {
- this.pubsub =
- Transport.newPubsubClient(context.getPipelineOptions().as(PubsubOptions.class))
- .build();
- }
-
- @Override
- public void processElement(ProcessContext c) throws IOException {
- if (c.element().isEmpty()) {
- return;
- }
- PubsubMessage pubsubMessage = new PubsubMessage();
- pubsubMessage.encodeData(c.element().getBytes());
- if (timestampLabelKey != null) {
- pubsubMessage.setAttributes(
- ImmutableMap.of(timestampLabelKey, Long.toString(c.timestamp().getMillis())));
- }
- PublishRequest publishRequest = new PublishRequest();
- publishRequest.setMessages(Arrays.asList(pubsubMessage));
- this.pubsub.projects().topics().publish(outputTopic, publishRequest).execute();
- }
- }
-
- /**
- * Creates a {@code PubsubFileInjector} transform with the given timestamp label key.
- */
- public static Unbound withTimestampLabelKey(String timestampLabelKey) {
- return new Unbound(timestampLabelKey);
- }
-
- /**
- * Creates a {@code PubsubFileInjector} transform that publishes to the given output topic.
- */
- public static Bound publish(String outputTopic) {
- return new Unbound().publish(outputTopic);
- }
-
- /**
- * Command line parameter options.
- */
- private interface PubsubFileInjectorOptions extends PipelineOptions {
- @Description("GCS location of files.")
- @Validation.Required
- String getInput();
- void setInput(String value);
-
- @Description("Topic to publish on.")
- @Validation.Required
- String getOutputTopic();
- void setOutputTopic(String value);
- }
-
- /**
- * Sets up and starts streaming pipeline.
- */
- public static void main(String[] args) {
- PubsubFileInjectorOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(PubsubFileInjectorOptions.class);
-
- Pipeline pipeline = Pipeline.create(options);
-
- pipeline
- .apply(TextIO.Read.from(options.getInput()))
- .apply(IntraBundleParallelization.of(PubsubFileInjector.publish(options.getOutputTopic()))
- .withMaxParallelism(20));
-
- pipeline.run();
- }
-}
[6/6] incubator-beam git commit: Update Beam examples archetypes
Posted by lc...@apache.org.
Update Beam examples archetypes
This closes #936
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9a08e45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9a08e45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9a08e45
Branch: refs/heads/master
Commit: e9a08e454b20c88c72ea4ce3138df77f898b26d7
Parents: e3768f6 76f0ff4
Author: Luke Cwik <lc...@google.com>
Authored: Tue Sep 13 18:01:42 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 13 18:01:42 2016 -0700
----------------------------------------------------------------------
.../apache/beam/examples/MinimalWordCount.java | 3 +-
.../beam/examples/common/ExampleUtils.java | 2 +-
pom.xml | 14 +-
sdks/java/maven-archetypes/examples/pom.xml | 25 ++
.../main/resources/archetype-resources/pom.xml | 20 +-
.../src/main/java/DebuggingWordCount.java | 32 +-
.../src/main/java/MinimalWordCount.java | 50 ++-
.../src/main/java/WindowedWordCount.java | 139 +++----
.../src/main/java/WordCount.java | 77 ++--
.../java/common/DataflowExampleOptions.java | 32 --
.../main/java/common/DataflowExampleUtils.java | 391 -------------------
.../common/ExampleBigQueryTableOptions.java | 11 +-
.../src/main/java/common/ExampleOptions.java | 32 ++
...xamplePubsubTopicAndSubscriptionOptions.java | 45 +++
.../java/common/ExamplePubsubTopicOptions.java | 17 +-
.../src/main/java/common/ExampleUtils.java | 353 +++++++++++++++++
.../main/java/common/PubsubFileInjector.java | 153 --------
.../src/test/java/WordCountTest.java | 9 +-
sdks/java/maven-archetypes/starter/pom.xml | 8 +
.../main/resources/archetype-resources/pom.xml | 6 +-
.../resources/projects/basic/reference/pom.xml | 6 +-
21 files changed, 641 insertions(+), 784 deletions(-)
----------------------------------------------------------------------
[4/6] incubator-beam git commit: Fix the maven-archetypes pom.xml
Posted by lc...@apache.org.
Fix the maven-archetypes pom.xml
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f30255e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f30255e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f30255e
Branch: refs/heads/master
Commit: 1f30255edcdd9c1e445b69248191c8552724f086
Parents: 1047e03
Author: Pei He <pe...@google.com>
Authored: Tue Sep 13 16:46:36 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 13 18:01:09 2016 -0700
----------------------------------------------------------------------
pom.xml | 14 +++++++---
sdks/java/maven-archetypes/examples/pom.xml | 28 ++++++++++++++++++++
.../main/resources/archetype-resources/pom.xml | 16 +++++------
sdks/java/maven-archetypes/starter/pom.xml | 8 ++++++
.../main/resources/archetype-resources/pom.xml | 2 +-
.../resources/projects/basic/reference/pom.xml | 2 +-
6 files changed, 54 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f30255e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f9e0479..242a9ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,13 +92,13 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <dataflow.javadoc_opts />
+ <dataflow.javadoc_opts/>
<!-- Disable integration tests by default -->
<skipITs>true</skipITs>
<!-- Do not add additional surefire arguments by default -->
- <beamSurefireArgline />
+ <beamSurefireArgline/>
<!-- If updating dependencies, please update any relevant javadoc offlineLinks -->
<avro.version>1.8.1</avro.version>
@@ -250,6 +250,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${project.version}</version>
</dependency>
@@ -926,7 +932,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
<pluginExecution>
@@ -940,7 +946,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f30255e/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
index e9bb5d8..75b88e2 100644
--- a/sdks/java/maven-archetypes/examples/pom.xml
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -66,4 +66,32 @@
</plugins>
</pluginManagement>
</build>
+ <dependencies>
+ <!-- Adds a dependency on a specific version of the Beam SDK. -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <!-- Adds a dependency on a specific version of the Dataflow runnner. -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <!-- Adds a dependency on a specific version of the Beam Google Cloud Platform IO module. -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f30255e/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index db7b899..3f8408d 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -108,27 +108,29 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
- <version>[0-incubating, 2-incubating)</version>
+ <version>0.3.0-incubating-SNAPSHOT</version>
</dependency>
<!-- Adds a dependency on a specific version of the Dataflow runnner. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
- <version>[0-incubating, 2-incubating)</version>
+ <version>0.3.0-incubating-SNAPSHOT</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
- <version>[0-incubating, 2-incubating)</version>
+ <version>0.3.0-incubating-SNAPSHOT</version>
+ <scope>runtime</scope>
</dependency>
<!-- Adds a dependency on a specific version of the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
- <version>[0-incubating, 2-incubating)</version>
+ <version>0.3.0-incubating-SNAPSHOT</version>
</dependency>
<dependency>
@@ -200,12 +202,6 @@
<version>19.0</version>
</dependency>
- <dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
- <version>3.1.0</version>
- </dependency>
-
<!-- Add slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f30255e/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 3830387..45b60a6 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -72,4 +72,12 @@
</plugins>
</pluginManagement>
</build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f30255e/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
index bd2de9c..fb932af 100644
--- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
@@ -55,7 +55,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
- <version>[0-incubating, 1-incubating)</version>
+ <version>0.3.0-incubating-SNAPSHOT</version>
</dependency>
<!-- slf4j API frontend binding with JUL backend -->
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f30255e/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
index a778fb6..11e32fb 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
@@ -55,7 +55,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
- <version>[0-incubating, 1-incubating)</version>
+ <version>0.3.0-incubating-SNAPSHOT</version>
</dependency>
<!-- slf4j API frontend binding with JUL backend -->
[2/6] incubator-beam git commit: Update Beam examples archetypes
Posted by lc...@apache.org.
Update Beam examples archetypes
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b9a66e4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b9a66e4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b9a66e4b
Branch: refs/heads/master
Commit: b9a66e4b50ae1fe5fa3afc33b2523e2f9d64b2c4
Parents: e3768f6
Author: Pei He <pe...@google.com>
Authored: Thu Sep 8 19:16:12 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 13 18:01:08 2016 -0700
----------------------------------------------------------------------
.../src/main/java/DebuggingWordCount.java | 31 +-
.../src/main/java/MinimalWordCount.java | 51 ++-
.../src/main/java/WindowedWordCount.java | 139 +++----
.../src/main/java/WordCount.java | 77 ++--
.../java/common/DataflowExampleOptions.java | 32 --
.../main/java/common/DataflowExampleUtils.java | 391 -------------------
.../common/ExampleBigQueryTableOptions.java | 11 +-
.../src/main/java/common/ExampleOptions.java | 37 ++
...xamplePubsubTopicAndSubscriptionOptions.java | 45 +++
.../java/common/ExamplePubsubTopicOptions.java | 17 +-
.../src/main/java/common/ExampleUtils.java | 353 +++++++++++++++++
.../main/java/common/PubsubFileInjector.java | 8 +-
.../src/test/java/WordCountTest.java | 9 +-
13 files changed, 592 insertions(+), 609 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index e9f4333..e315ba9 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
@@ -36,8 +36,9 @@ import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
- * An example that verifies word counts in Shakespeare and includes Dataflow best practices.
+ * An example that verifies word counts in Shakespeare and includes Beam best practices.
*
* <p>This class, {@link DebuggingWordCount}, is the third in a series of four successively more
* detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount}
@@ -46,12 +47,12 @@ import org.slf4j.LoggerFactory;
*
* <p>Basic concepts, also in the MinimalWordCount and WordCount examples:
* Reading text files; counting a PCollection; executing a Pipeline both locally
- * and using the Dataflow service; defining DoFns.
+ * and using a selected runner; defining DoFns.
*
* <p>New Concepts:
* <pre>
* 1. Logging to Cloud Logging
- * 2. Controlling Dataflow worker log levels
+ * 2. Controlling worker log levels
* 3. Creating a custom aggregator
* 4. Testing your Pipeline via PAssert
* </pre>
@@ -62,12 +63,14 @@ import org.slf4j.LoggerFactory;
* }
* </pre>
*
- * <p>To execute this pipeline using the Dataflow service and the additional logging discussed
- * below, specify pipeline configuration:
+ * <p>To change the runner, specify:
+ * <pre>{@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }
+ * </pre>
+ *
+ * <p>To use the additional logging discussed below, specify:
* <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowRunner
* --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
* }
* </pre>
@@ -100,12 +103,12 @@ import org.slf4j.LoggerFactory;
* that changing the default worker log level to TRACE or DEBUG will significantly increase
* the amount of logs output.
*
- * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
- * overridden with {@code --inputFile}.
+ * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
+ * and can be overridden with {@code --inputFile}.
*/
public class DebuggingWordCount {
/** A DoFn that filters for a specific key based upon a regular expression. */
- public static class FilterTextFn extends OldDoFn<KV<String, Long>, KV<String, Long>> {
+ public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
/**
* Concept #1: The logger below uses the fully qualified class name of FilterTextFn
* as the logger. All log statements emitted by this logger will be referenced by this name
@@ -131,7 +134,7 @@ public class DebuggingWordCount {
private final Aggregator<Long, Long> unmatchedWords =
createAggregator("umatchedWords", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (filter.matcher(c.element().getKey()).matches()) {
// Log at the "DEBUG" level each element that we match. When executing this pipeline
@@ -149,7 +152,7 @@ public class DebuggingWordCount {
}
}
}
-
+
/**
* Options supported by {@link DebuggingWordCount}.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
index 55beb1f..f739fd8 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -17,14 +17,15 @@
*/
package ${package};
-import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
@@ -48,26 +49,34 @@ import org.apache.beam.sdk.values.KV;
* 4. Writing data to Cloud Storage as text files
* </pre>
*
- * <p>To execute this pipeline, first edit the code to set your project ID, the staging
+ * <p>To execute this pipeline, first edit the code to set your project ID, the temp
* location, and the output location. The specified GCS bucket(s) must already exist.
*
- * <p>Then, run the pipeline as described in the README. It will be deployed and run using the
- * Dataflow service. No args are required to run the pipeline. You can see the results in your
+ * <p>Then, run the pipeline as described in the README. It will be deployed and run with the
+ * selected runner. No args are required to run the pipeline. You can see the results in your
* output bucket in the GCS browser.
*/
public class MinimalWordCount {
public static void main(String[] args) {
- // Create a DataflowPipelineOptions object. This object lets us set various execution
+ // Create a PipelineOptions object. This object lets us set various execution
// options for our pipeline, such as the associated Cloud Platform project and the location
// in Google Cloud Storage to stage files.
- DataflowPipelineOptions options = PipelineOptionsFactory.create()
- .as(DataflowPipelineOptions.class);
- options.setRunner(BlockingDataflowRunner.class);
- // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud.
- options.setProject("SET_YOUR_PROJECT_ID_HERE");
- // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files.
- options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ // In order to run your pipeline, you need to make following runner specific changes:
+ //
+ // CHANGE 1/3: Select a Beam runner, such as DataflowRunner or FlinkRunner.
+ // CHANGE 2/3: Specify runner-required options.
+ // For DataflowRunner, set project and temp location as follows:
+ // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+ // dataflowOptions.setRunner(DataflowRunner.class);
+ // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
+ // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
+ // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
+ // for more details.
+ // options.as(FlinkPipelineOptions.class)
+ // .setRunner(FlinkRunner.class);
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
@@ -77,13 +86,13 @@ public class MinimalWordCount {
// Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
// of input text files. TextIO.Read returns a PCollection where each element is one line from
// the input text (a set of Shakespeare's texts).
- p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
+ p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
// Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
// The ParDo returns a PCollection<String>, where each element is an individual word in
// Shakespeare's collected texts.
- .apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() {
- @Override
+ .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
@@ -96,12 +105,12 @@ public class MinimalWordCount {
// transform returns a new PCollection of key/value pairs, where each key represents a unique
// word in the text. The associated value is the occurrence count for that word.
.apply(Count.<String>perElement())
- // Apply another ParDo transform that formats our PCollection of word counts into a printable
+ // Apply a MapElements transform that formats our PCollection of word counts into a printable
// string, suitable for writing to an output file.
- .apply("FormatResults", ParDo.of(new OldDoFn<KV<String, Long>, String>() {
+ .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
@Override
- public void processElement(ProcessContext c) {
- c.output(c.element().getKey() + ": " + c.element().getValue());
+ public String apply(KV<String, Long> input) {
+ return input.getKey() + ": " + input.getValue();
}
}))
// Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
index 17bf7ca..787e8c9 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
@@ -17,23 +17,24 @@
*/
package ${package};
+import ${package}.common.ExampleBigQueryTableOptions;
+import ${package}.common.ExampleOptions;
+import ${package}.common.ExampleUtils;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import ${package}.common.DataflowExampleUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -41,8 +42,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
/**
* An example that counts words in text, and can run over either unbounded or bounded input
@@ -54,58 +54,43 @@ import org.slf4j.LoggerFactory;
*
* <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples:
* Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally
- * and using the Dataflow service; defining DoFns; creating a custom aggregator;
+ * and using a selected runner; defining DoFns; creating a custom aggregator;
* user-defined PTransforms; defining PipelineOptions.
*
* <p>New Concepts:
* <pre>
* 1. Unbounded and bounded pipeline input modes
* 2. Adding timestamps to data
- * 3. PubSub topics as sources
- * 4. Windowing
- * 5. Re-using PTransforms over windowed PCollections
- * 6. Writing to BigQuery
- * </pre>
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }
+ * 3. Windowing
+ * 4. Re-using PTransforms over windowed PCollections
+ * 5. Writing to BigQuery
* </pre>
*
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <p>By default, the examples will run with the {@code DirectRunner}.
+ * To change the runner, specify:
* <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowRunner
+ * --runner=YOUR_SELECTED_RUNNER
* }
* </pre>
+ * See examples/java/README.md for instructions about how to configure different runners.
*
* <p>Optionally specify the input file path via:
* {@code --inputFile=gs://INPUT_PATH},
- * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}.
+ * which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}.
*
* <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't
* specify the table, one will be created for you using the job name. If you don't specify the
- * dataset, a dataset called {@code dataflow-examples} must already exist in your project.
+ * dataset, a dataset called {@code beam_examples} must already exist in your project.
* {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}.
*
- * <p>Decide whether you want your pipeline to run with 'bounded' (such as files in GCS) or
- * 'unbounded' input (such as a PubSub topic). To run with unbounded input, set
- * {@code --unbounded=true}. Then, optionally specify the Google Cloud PubSub topic to read from
- * via {@code --pubsubTopic=projects/PROJECT_ID/topics/YOUR_TOPIC_NAME}. If the topic does not
- * exist, the pipeline will create one for you. It will delete this topic when it terminates.
- * The pipeline will automatically launch an auxiliary batch pipeline to populate the given PubSub
- * topic with the contents of the {@code --inputFile}, in order to make the example easy to run.
- * If you want to use an independently-populated PubSub topic, indicate this by setting
- * {@code --inputFile=""}. In that case, the auxiliary pipeline will not be started.
- *
* <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can
* change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10}
* for 10-minute windows.
+ *
+ * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
*/
public class WindowedWordCount {
- private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
static final int WINDOW_SIZE = 1; // Default window duration in minutes
/**
@@ -116,14 +101,19 @@ public class WindowedWordCount {
* his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
* 2-hour period.
*/
- static class AddTimestampFn extends OldDoFn<String, String> {
- private static final long RAND_RANGE = 7200000; // 2 hours in ms
+ static class AddTimestampFn extends DoFn<String, String> {
+ private static final Duration RAND_RANGE = Duration.standardHours(2);
+ private final Instant minTimestamp;
- @Override
+ AddTimestampFn() {
+ this.minTimestamp = new Instant(System.currentTimeMillis());
+ }
+
+ @ProcessElement
public void processElement(ProcessContext c) {
// Generate a timestamp that falls somewhere in the past two hours.
- long randomTimestamp = System.currentTimeMillis()
- - (int) (Math.random() * RAND_RANGE);
+ long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
+ Instant randomTimestamp = minTimestamp.plus(randMillis);
/**
* Concept #2: Set the data element with that timestamp.
*/
@@ -132,8 +122,8 @@ public class WindowedWordCount {
}
/** A DoFn that converts a Word and Count into a BigQuery table row. */
- static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> {
- @Override
+ static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
+ @ProcessElement
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("word", c.element().getKey())
@@ -157,7 +147,7 @@ public class WindowedWordCount {
}
/**
- * Concept #6: We'll stream the results to a BigQuery table. The BigQuery output source is one
+ * Concept #5: We'll stream the results to a BigQuery table. The BigQuery output source is one
* that supports both bounded and unbounded data. This is a helper method that creates a
* TableReference from input options, to tell the pipeline where to write its BigQuery results.
*/
@@ -173,56 +163,39 @@ public class WindowedWordCount {
* Options supported by {@link WindowedWordCount}.
*
* <p>Inherits standard example configuration options, which allow specification of the BigQuery
- * table and the PubSub topic, as well as the {@link WordCount.WordCountOptions} support for
+ * table, as well as the {@link WordCount.WordCountOptions} support for
* specification of the input file.
*/
- public static interface Options
- extends WordCount.WordCountOptions, DataflowExampleUtils.DataflowExampleUtilsOptions {
+ public static interface Options extends WordCount.WordCountOptions,
+ ExampleOptions, ExampleBigQueryTableOptions {
@Description("Fixed window duration, in minutes")
@Default.Integer(WINDOW_SIZE)
Integer getWindowSize();
void setWindowSize(Integer value);
-
- @Description("Whether to run the pipeline with unbounded input")
- boolean isUnbounded();
- void setUnbounded(boolean value);
}
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setBigQuerySchema(getSchema());
- // DataflowExampleUtils creates the necessary input sources to simplify execution of this
- // Pipeline.
- DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
- options.isUnbounded());
+ // ExampleUtils creates the necessary input sources to simplify execution of this Pipeline.
+ ExampleUtils exampleUtils = new ExampleUtils(options);
+ exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
/**
- * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or
+ * Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or
* unbounded input source.
*/
- PCollection<String> input;
- if (options.isUnbounded()) {
- LOG.info("Reading from PubSub.");
- /**
- * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't
- * specified as an argument. The data elements' timestamps will come from the pubsub
- * injection.
- */
- input = pipeline
- .apply(PubsubIO.Read.topic(options.getPubsubTopic()));
- } else {
- /** Else, this is a bounded pipeline. Read from the GCS file. */
- input = pipeline
- .apply(TextIO.Read.from(options.getInputFile()))
- // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
- // See AddTimestampFn for more detail on this.
- .apply(ParDo.of(new AddTimestampFn()));
- }
+ PCollection<String> input = pipeline
+ /** Read from the GCS file. */
+ .apply(TextIO.Read.from(options.getInputFile()))
+ // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
+ // See AddTimestampFn for more detail on this.
+ .apply(ParDo.of(new AddTimestampFn()));
/**
- * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1
+ * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
* minute (you can change this with a command-line option). See the documentation for more
* information on how fixed windows work, and for information on the other types of windowing
* available (e.g., sliding windows).
@@ -232,29 +205,25 @@ public class WindowedWordCount {
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
/**
- * Concept #5: Re-use our existing CountWords transform that does not have knowledge of
+ * Concept #4: Re-use our existing CountWords transform that does not have knowledge of
* windows over a PCollection containing windowed values.
*/
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
/**
- * Concept #6: Format the results for a BigQuery table, then write to BigQuery.
+ * Concept #5: Format the results for a BigQuery table, then write to BigQuery.
* The BigQuery output source supports both bounded and unbounded data.
*/
wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
- .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
+ .apply(BigQueryIO.Write
+ .to(getTableReference(options))
+ .withSchema(getSchema())
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
PipelineResult result = pipeline.run();
- /**
- * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that
- * runs for a limited time, and publishes to the input PubSub topic.
- *
- * With an unbounded input source, you will need to explicitly shut down this pipeline when you
- * are done with it, so that you do not continue to be charged for the instances. You can do
- * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow
- * pipelines. The PubSub topic will also be deleted at this time.
- */
- exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
+ // ExampleUtils will try to cancel the pipeline before the program exists.
+ exampleUtils.waitToFinish(result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 5432036..b096d8d 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -17,7 +17,8 @@
*/
package ${package};
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import com.google.common.base.Strings;
+import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
@@ -27,17 +28,19 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-
/**
- * An example that counts words in Shakespeare and includes Dataflow best practices.
+ * An example that counts words in Shakespeare and includes Beam best practices.
*
* <p>This class, {@link WordCount}, is the second in a series of four successively more detailed
* 'word count' examples. You may first want to take a look at {@link MinimalWordCount}.
@@ -45,8 +48,8 @@ import org.apache.beam.sdk.values.PCollection;
* pipeline, for introduction of additional concepts.
*
* <p>For a detailed walkthrough of this example, see
- * <a href="https://cloud.google.com/dataflow/java-sdk/wordcount-example">
- * https://cloud.google.com/dataflow/java-sdk/wordcount-example
+ * <a href="http://beam.incubator.apache.org/use/walkthroughs/">
+ * http://beam.incubator.apache.org/use/walkthroughs/
* </a>
*
* <p>Basic concepts, also in the MinimalWordCount example:
@@ -54,39 +57,29 @@ import org.apache.beam.sdk.values.PCollection;
*
* <p>New Concepts:
* <pre>
- * 1. Executing a Pipeline both locally and using the Dataflow service
+ * 1. Executing a Pipeline both locally and using the selected runner
* 2. Using ParDo with static DoFns defined out-of-line
* 3. Building a composite transform
* 4. Defining your own pipeline options
* </pre>
*
- * <p>Concept #1: you can execute this pipeline either locally or using the Dataflow service.
+ * <p>Concept #1: you can execute this pipeline either locally or using the selected runner.
* These are now command-line options and not hard-coded as they were in the MinimalWordCount
* example.
- * To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and a local output file or output prefix on GCS:
+ * To execute this pipeline locally, specify a local output file or output prefix on GCS:
* <pre>{@code
* --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
* }</pre>
*
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <p>To change the runner, specify:
* <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowRunner
+ * --runner=YOUR_SELECTED_RUNNER
* }
* </pre>
- * and an output prefix on GCS:
- * <pre>{@code
- * --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
+ * See examples/java/README.md for instructions about how to configure different runners.
*
- * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
- * overridden with {@code --inputFile}.
+ * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
+ * and can be overridden with {@code --inputFile}.
*/
public class WordCount {
@@ -95,11 +88,11 @@ public class WordCount {
* of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
* pipeline.
*/
- static class ExtractWordsFn extends OldDoFn<String, String> {
+ static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
@@ -117,11 +110,11 @@ public class WordCount {
}
}
- /** A DoFn that converts a Word and Count into a printable string. */
- public static class FormatAsTextFn extends OldDoFn<KV<String, Long>, String> {
+ /** A SimpleFunction that converts a Word and Count into a printable string. */
+ public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
- public void processElement(ProcessContext c) {
- c.output(c.element().getKey() + ": " + c.element().getValue());
+ public String apply(KV<String, Long> input) {
+ return input.getKey() + ": " + input.getValue();
}
}
@@ -161,7 +154,7 @@ public class WordCount {
*/
public static interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
- @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+ @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
@@ -171,21 +164,25 @@ public class WordCount {
void setOutput(String value);
/**
- * Returns "gs://${YOUR_STAGING_DIRECTORY}/counts.txt" as the default destination.
+ * Returns "gs://${YOUR_TEMP_DIRECTORY}/counts.txt" as the default destination.
*/
public static class OutputFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
- if (dataflowOptions.getStagingLocation() != null) {
- return GcsPath.fromUri(dataflowOptions.getStagingLocation())
- .resolve("counts.txt").toString();
+ String tempLocation = options.getTempLocation();
+ if (!Strings.isNullOrEmpty(tempLocation)) {
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+ return factory.resolve(tempLocation, "counts.txt");
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve temp location: %s", tempLocation));
+ }
} else {
- throw new IllegalArgumentException("Must specify --output or --stagingLocation");
+ throw new IllegalArgumentException("Must specify --output or --tempLocation");
}
}
}
-
}
public static void main(String[] args) {
@@ -197,7 +194,7 @@ public class WordCount {
// static FormatAsTextFn() to the ParDo transform.
p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
.apply(new CountWords())
- .apply(ParDo.of(new FormatAsTextFn()))
+ .apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
p.run();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java
deleted file mode 100644
index e3bf7c5..0000000
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-
-/**
- * Options that can be used to configure the Dataflow examples.
- */
-public interface DataflowExampleOptions extends DataflowPipelineOptions {
- @Description("Whether to keep jobs running on the Dataflow service after local process exit")
- @Default.Boolean(false)
- boolean getKeepJobsRunning();
- void setKeepJobsRunning(boolean keepJobsRunning);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
deleted file mode 100644
index 9e6be78..0000000
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Datasets;
-import com.google.api.services.bigquery.Bigquery.Tables;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.transforms.IntraBundleParallelization;
-import org.apache.beam.sdk.util.Transport;
-
-/**
- * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub
- * injector, and cancels the streaming and the injector pipelines once the program terminates.
- *
- * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes.
- */
-public class DataflowExampleUtils {
-
- private final DataflowPipelineOptions options;
- private Bigquery bigQueryClient = null;
- private Pubsub pubsubClient = null;
- private Dataflow dataflowClient = null;
- private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
- private List<String> pendingMessages = Lists.newArrayList();
-
- /**
- * Define an interface that supports the PubSub and BigQuery example options.
- */
- public static interface DataflowExampleUtilsOptions
- extends DataflowExampleOptions, ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
- }
-
- public DataflowExampleUtils(DataflowPipelineOptions options) {
- this.options = options;
- }
-
- /**
- * Do resources and runner options setup.
- */
- public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
- throws IOException {
- this.options = options;
- setupResourcesAndRunner(isUnbounded);
- }
-
- /**
- * Sets up external resources that are required by the example,
- * such as Pub/Sub topics and BigQuery tables.
- *
- * @throws IOException if there is a problem setting up the resources
- */
- public void setup() throws IOException {
- setupPubsubTopic();
- setupBigQueryTable();
- }
-
- /**
- * Set up external resources, and configure the runner appropriately.
- */
- public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
- if (isUnbounded) {
- options.setStreaming(true);
- }
- setup();
- setupRunner();
- }
-
- /**
- * Sets up the Google Cloud Pub/Sub topic.
- *
- * <p>If the topic doesn't exist, a new topic with the given name will be created.
- *
- * @throws IOException if there is a problem setting up the Pub/Sub topic
- */
- public void setupPubsubTopic() throws IOException {
- ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
- if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) {
- pendingMessages.add("*******************Set Up Pubsub Topic*********************");
- setupPubsubTopic(pubsubTopicOptions.getPubsubTopic());
- pendingMessages.add("The Pub/Sub topic has been set up for this example: "
- + pubsubTopicOptions.getPubsubTopic());
- }
- }
-
- /**
- * Sets up the BigQuery table with the given schema.
- *
- * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
- * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
- * will be created.
- *
- * @throws IOException if there is a problem setting up the BigQuery table
- */
- public void setupBigQueryTable() throws IOException {
- ExampleBigQueryTableOptions bigQueryTableOptions =
- options.as(ExampleBigQueryTableOptions.class);
- if (bigQueryTableOptions.getBigQueryDataset() != null
- && bigQueryTableOptions.getBigQueryTable() != null
- && bigQueryTableOptions.getBigQuerySchema() != null) {
- pendingMessages.add("******************Set Up Big Query Table*******************");
- setupBigQueryTable(bigQueryTableOptions.getProject(),
- bigQueryTableOptions.getBigQueryDataset(),
- bigQueryTableOptions.getBigQueryTable(),
- bigQueryTableOptions.getBigQuerySchema());
- pendingMessages.add("The BigQuery table has been set up for this example: "
- + bigQueryTableOptions.getProject()
- + ":" + bigQueryTableOptions.getBigQueryDataset()
- + "." + bigQueryTableOptions.getBigQueryTable());
- }
- }
-
- /**
- * Tears down external resources that can be deleted upon the example's completion.
- */
- private void tearDown() {
- pendingMessages.add("*************************Tear Down*************************");
- ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
- if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) {
- try {
- deletePubsubTopic(pubsubTopicOptions.getPubsubTopic());
- pendingMessages.add("The Pub/Sub topic has been deleted: "
- + pubsubTopicOptions.getPubsubTopic());
- } catch (IOException e) {
- pendingMessages.add("Failed to delete the Pub/Sub topic : "
- + pubsubTopicOptions.getPubsubTopic());
- }
- }
-
- ExampleBigQueryTableOptions bigQueryTableOptions =
- options.as(ExampleBigQueryTableOptions.class);
- if (bigQueryTableOptions.getBigQueryDataset() != null
- && bigQueryTableOptions.getBigQueryTable() != null
- && bigQueryTableOptions.getBigQuerySchema() != null) {
- pendingMessages.add("The BigQuery table might contain the example's output, "
- + "and it is not deleted automatically: "
- + bigQueryTableOptions.getProject()
- + ":" + bigQueryTableOptions.getBigQueryDataset()
- + "." + bigQueryTableOptions.getBigQueryTable());
- pendingMessages.add("Please go to the Developers Console to delete it manually."
- + " Otherwise, you may be charged for its usage.");
- }
- }
-
- private void setupBigQueryTable(String projectId, String datasetId, String tableId,
- TableSchema schema) throws IOException {
- if (bigQueryClient == null) {
- bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
- }
-
- Datasets datasetService = bigQueryClient.datasets();
- if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
- Dataset newDataset = new Dataset().setDatasetReference(
- new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
- datasetService.insert(projectId, newDataset).execute();
- }
-
- Tables tableService = bigQueryClient.tables();
- Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
- if (table == null) {
- Table newTable = new Table().setSchema(schema).setTableReference(
- new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
- tableService.insert(projectId, datasetId, newTable).execute();
- } else if (!table.getSchema().equals(schema)) {
- throw new RuntimeException(
- "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
- + ", actual: " + table.getSchema().toPrettyString());
- }
- }
-
- private void setupPubsubTopic(String topic) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
- pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
- }
- }
-
- /**
- * Deletes the Google Cloud Pub/Sub topic.
- *
- * @throws IOException if there is a problem deleting the Pub/Sub topic
- */
- private void deletePubsubTopic(String topic) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
- pubsubClient.projects().topics().delete(topic).execute();
- }
- }
-
- /**
- * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined,
- * start an 'injector' pipeline that publishes the contents of the file to the given topic, first
- * creating the topic if necessary.
- */
- public void startInjectorIfNeeded(String inputFile) {
- ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
- if (pubsubTopicOptions.isStreaming()
- && inputFile != null && !inputFile.isEmpty()
- && pubsubTopicOptions.getPubsubTopic() != null
- && !pubsubTopicOptions.getPubsubTopic().isEmpty()) {
- runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic());
- }
- }
-
- public void setupRunner() {
- if (options.isStreaming() && options.getRunner().equals(BlockingDataflowRunner.class)) {
- // In order to cancel the pipelines automatically,
- // {@literal DataflowRunner} is forced to be used.
- options.setRunner(DataflowRunner.class);
- }
- }
-
- /**
- * Runs the batch injector for the streaming pipeline.
- *
- * <p>The injector pipeline will read from the given text file, and inject data
- * into the Google Cloud Pub/Sub topic.
- */
- public void runInjectorPipeline(String inputFile, String topic) {
- DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
- copiedOptions.setStreaming(false);
- copiedOptions.setWorkerHarnessContainerImage(
- DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE);
- copiedOptions.setNumWorkers(
- options.as(ExamplePubsubTopicOptions.class).getInjectorNumWorkers());
- copiedOptions.setJobName(options.getJobName() + "-injector");
- Pipeline injectorPipeline = Pipeline.create(copiedOptions);
- injectorPipeline.apply(TextIO.Read.from(inputFile))
- .apply(IntraBundleParallelization
- .of(PubsubFileInjector.publish(topic))
- .withMaxParallelism(20));
- DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run();
- jobsToCancel.add(injectorJob);
- }
-
- /**
- * Runs the provided injector pipeline for the streaming pipeline.
- */
- public void runInjectorPipeline(Pipeline injectorPipeline) {
- DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run();
- jobsToCancel.add(injectorJob);
- }
-
- /**
- * Start the auxiliary injector pipeline, then wait for this pipeline to finish.
- */
- public void mockUnboundedSource(String inputFile, PipelineResult result) {
- startInjectorIfNeeded(inputFile);
- waitToFinish(result);
- }
-
- /**
- * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
- * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
- */
- public void waitToFinish(PipelineResult result) {
- if (result instanceof DataflowPipelineJob) {
- final DataflowPipelineJob job = (DataflowPipelineJob) result;
- jobsToCancel.add(job);
- if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) {
- addShutdownHook(jobsToCancel);
- }
- try {
- job.waitUntilFinish();
- } catch (Exception e) {
- throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
- }
- } else {
- // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
- // such as EvaluationResults returned by DirectRunner.
- }
- }
-
- private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
- if (dataflowClient == null) {
- dataflowClient = options.getDataflowClient();
- }
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- tearDown();
- printPendingMessages();
- for (DataflowPipelineJob job : jobs) {
- System.out.println("Canceling example pipeline: " + job.getJobId());
- try {
- job.cancel();
- } catch (IOException e) {
- System.out.println("Failed to cancel the job,"
- + " please go to the Developers Console to cancel it manually");
- System.out.println(
- MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
- }
- }
-
- for (DataflowPipelineJob job : jobs) {
- boolean cancellationVerified = false;
- for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
- if (job.getState().isTerminal()) {
- cancellationVerified = true;
- System.out.println("Canceled example pipeline: " + job.getJobId());
- break;
- } else {
- System.out.println(
- "The example pipeline is still running. Verifying the cancellation.");
- }
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- if (!cancellationVerified) {
- System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
- System.out.println("Please go to the Developers Console to verify manually:");
- System.out.println(
- MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
- }
- }
- }
- });
- }
-
- private void printPendingMessages() {
- System.out.println();
- System.out.println("***********************************************************");
- System.out.println("***********************************************************");
- for (String message : pendingMessages) {
- System.out.println(message);
- }
- System.out.println("***********************************************************");
- System.out.println("***********************************************************");
- }
-
- private static <T> T executeNullIfNotFound(
- AbstractGoogleClientRequest<T> request) throws IOException {
- try {
- return request.execute();
- } catch (GoogleJsonResponseException e) {
- if (e.getStatusCode() == HttpServletResponse.SC_NOT_FOUND) {
- return null;
- } else {
- throw e;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
index 79fa865..96e8406 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
@@ -18,19 +18,19 @@
package ${package}.common;
import com.google.api.services.bigquery.model.TableSchema;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
- * Options that can be used to configure BigQuery tables in Dataflow examples.
+ * Options that can be used to configure BigQuery tables in Beam examples.
* The project defaults to the project being used to run the example.
*/
-public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
+public interface ExampleBigQueryTableOptions extends GcpOptions {
@Description("BigQuery dataset name")
- @Default.String("dataflow_examples")
+ @Default.String("beam_examples")
String getBigQueryDataset();
void setBigQueryDataset(String dataset);
@@ -49,8 +49,7 @@ public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
static class BigQueryTableFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
- return options.as(DataflowPipelineOptions.class).getJobName()
- .replace('-', '_');
+ return options.getJobName().replace('-', '_');
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
new file mode 100644
index 0000000..90f935c
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ${package}.common;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options that can be used to configure the Beam examples.
+ */
+public interface ExampleOptions extends PipelineOptions {
+ @Description("Whether to keep jobs running after local process exit")
+ @Default.Boolean(false)
+ boolean getKeepJobsRunning();
+ void setKeepJobsRunning(boolean keepJobsRunning);
+
+ @Description("Number of workers to use when executing the injector pipeline")
+ @Default.Integer(1)
+ int getInjectorNumWorkers();
+ void setInjectorNumWorkers(int numWorkers);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
new file mode 100644
index 0000000..e3fb132
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ${package}.common;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options that can be used to configure Pub/Sub topic/subscription in Beam examples.
+ */
+public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions {
+ @Description("Pub/Sub subscription")
+ @Default.InstanceFactory(PubsubSubscriptionFactory.class)
+ String getPubsubSubscription();
+ void setPubsubSubscription(String subscription);
+
+ /**
+ * Returns a default Pub/Sub subscription based on the project and the job names.
+ */
+ static class PubsubSubscriptionFactory implements DefaultValueFactory<String> {
+ @Override
+ public String create(PipelineOptions options) {
+ return "projects/" + options.as(GcpOptions.class).getProject()
+ + "/subscriptions/" + options.getJobName();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
index 8a7c9cf..1825267 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
@@ -17,36 +17,29 @@
*/
package ${package}.common;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
- * Options that can be used to configure Pub/Sub topic in Dataflow examples.
+ * Options that can be used to configure Pub/Sub topic in Beam examples.
*/
-public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions {
+public interface ExamplePubsubTopicOptions extends GcpOptions {
@Description("Pub/Sub topic")
@Default.InstanceFactory(PubsubTopicFactory.class)
String getPubsubTopic();
void setPubsubTopic(String topic);
- @Description("Number of workers to use when executing the injector pipeline")
- @Default.Integer(1)
- int getInjectorNumWorkers();
- void setInjectorNumWorkers(int numWorkers);
-
/**
* Returns a default Pub/Sub topic based on the project and the job names.
*/
static class PubsubTopicFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowPipelineOptions =
- options.as(DataflowPipelineOptions.class);
- return "projects/" + dataflowPipelineOptions.getProject()
- + "/topics/" + dataflowPipelineOptions.getJobName();
+ return "projects/" + options.as(GcpOptions.class).getProject()
+ + "/topics/" + options.getJobName();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
new file mode 100644
index 0000000..afef188
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ${package}.common;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Datasets;
+import com.google.api.services.bigquery.Bigquery.Tables;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Transport;
+import org.joda.time.Duration;
+
+/**
+ * The utility class that sets up and tears down external resources,
+ * and cancels the streaming pipelines once the program terminates.
+ *
+ * <p>It is used to run Beam examples, such as TrafficMaxLaneFlow and TrafficRoutes.
+ */
+public class ExampleUtils {
+
+ private static final int SC_NOT_FOUND = 404;
+
+ private final PipelineOptions options;
+ private Bigquery bigQueryClient = null;
+ private Pubsub pubsubClient = null;
+ private Set<PipelineResult> pipelinesToCancel = Sets.newHashSet();
+ private List<String> pendingMessages = Lists.newArrayList();
+
+ /**
+ * Do resources and runner options setup.
+ */
+ public ExampleUtils(PipelineOptions options) {
+ this.options = options;
+ }
+
+ /**
+ * Sets up external resources that are required by the example,
+ * such as Pub/Sub topics and BigQuery tables.
+ *
+ * @throws IOException if there is a problem setting up the resources
+ */
+ public void setup() throws IOException {
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backOff =
+ FluentBackoff.DEFAULT
+ .withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff();
+ Throwable lastException = null;
+ try {
+ do {
+ try {
+ setupPubsub();
+ setupBigQueryTable();
+ return;
+ } catch (GoogleJsonResponseException e) {
+ lastException = e;
+ }
+ } while (BackOffUtils.next(sleeper, backOff));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // Ignore InterruptedException
+ }
+ throw new RuntimeException(lastException);
+ }
+
+ /**
+ * Sets up the Google Cloud Pub/Sub topic.
+ *
+ * <p>If the topic doesn't exist, a new topic with the given name will be created.
+ *
+ * @throws IOException if there is a problem setting up the Pub/Sub topic
+ */
+ public void setupPubsub() throws IOException {
+ ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+ options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+ if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+ pendingMessages.add("**********************Set Up Pubsub************************");
+ setupPubsubTopic(pubsubOptions.getPubsubTopic());
+ pendingMessages.add("The Pub/Sub topic has been set up for this example: "
+ + pubsubOptions.getPubsubTopic());
+
+ if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+ setupPubsubSubscription(
+ pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
+ pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
+ + pubsubOptions.getPubsubSubscription());
+ }
+ }
+ }
+
+ /**
+ * Sets up the BigQuery table with the given schema.
+ *
+ * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
+ * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
+ * will be created.
+ *
+ * @throws IOException if there is a problem setting up the BigQuery table
+ */
+ public void setupBigQueryTable() throws IOException {
+ ExampleBigQueryTableOptions bigQueryTableOptions =
+ options.as(ExampleBigQueryTableOptions.class);
+ if (bigQueryTableOptions.getBigQueryDataset() != null
+ && bigQueryTableOptions.getBigQueryTable() != null
+ && bigQueryTableOptions.getBigQuerySchema() != null) {
+ pendingMessages.add("******************Set Up Big Query Table*******************");
+ setupBigQueryTable(bigQueryTableOptions.getProject(),
+ bigQueryTableOptions.getBigQueryDataset(),
+ bigQueryTableOptions.getBigQueryTable(),
+ bigQueryTableOptions.getBigQuerySchema());
+ pendingMessages.add("The BigQuery table has been set up for this example: "
+ + bigQueryTableOptions.getProject()
+ + ":" + bigQueryTableOptions.getBigQueryDataset()
+ + "." + bigQueryTableOptions.getBigQueryTable());
+ }
+ }
+
+ /**
+ * Tears down external resources that can be deleted upon the example's completion.
+ */
+ private void tearDown() {
+ pendingMessages.add("*************************Tear Down*************************");
+ ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+ options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+ if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+ try {
+ deletePubsubTopic(pubsubOptions.getPubsubTopic());
+ pendingMessages.add("The Pub/Sub topic has been deleted: "
+ + pubsubOptions.getPubsubTopic());
+ } catch (IOException e) {
+ pendingMessages.add("Failed to delete the Pub/Sub topic : "
+ + pubsubOptions.getPubsubTopic());
+ }
+ if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+ try {
+ deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
+ pendingMessages.add("The Pub/Sub subscription has been deleted: "
+ + pubsubOptions.getPubsubSubscription());
+ } catch (IOException e) {
+ pendingMessages.add("Failed to delete the Pub/Sub subscription : "
+ + pubsubOptions.getPubsubSubscription());
+ }
+ }
+ }
+
+ ExampleBigQueryTableOptions bigQueryTableOptions =
+ options.as(ExampleBigQueryTableOptions.class);
+ if (bigQueryTableOptions.getBigQueryDataset() != null
+ && bigQueryTableOptions.getBigQueryTable() != null
+ && bigQueryTableOptions.getBigQuerySchema() != null) {
+ pendingMessages.add("The BigQuery table might contain the example's output, "
+ + "and it is not deleted automatically: "
+ + bigQueryTableOptions.getProject()
+ + ":" + bigQueryTableOptions.getBigQueryDataset()
+ + "." + bigQueryTableOptions.getBigQueryTable());
+ pendingMessages.add("Please go to the Developers Console to delete it manually."
+ + " Otherwise, you may be charged for its usage.");
+ }
+ }
+
+ private void setupBigQueryTable(String projectId, String datasetId, String tableId,
+ TableSchema schema) throws IOException {
+ if (bigQueryClient == null) {
+ bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
+ }
+
+ Datasets datasetService = bigQueryClient.datasets();
+ if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
+ Dataset newDataset = new Dataset().setDatasetReference(
+ new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
+ datasetService.insert(projectId, newDataset).execute();
+ }
+
+ Tables tableService = bigQueryClient.tables();
+ Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
+ if (table == null) {
+ Table newTable = new Table().setSchema(schema).setTableReference(
+ new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
+ tableService.insert(projectId, datasetId, newTable).execute();
+ } else if (!table.getSchema().equals(schema)) {
+ throw new RuntimeException(
+ "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
+ + ", actual: " + table.getSchema().toPrettyString());
+ }
+ }
+
+ private void setupPubsubTopic(String topic) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
+ pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
+ }
+ }
+
+ private void setupPubsubSubscription(String topic, String subscription) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
+ Subscription subInfo = new Subscription()
+ .setAckDeadlineSeconds(60)
+ .setTopic(topic);
+ pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
+ }
+ }
+
+ /**
+ * Deletes the Google Cloud Pub/Sub topic.
+ *
+ * @throws IOException if there is a problem deleting the Pub/Sub topic
+ */
+ private void deletePubsubTopic(String topic) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
+ pubsubClient.projects().topics().delete(topic).execute();
+ }
+ }
+
+ /**
+ * Deletes the Google Cloud Pub/Sub subscription.
+ *
+ * @throws IOException if there is a problem deleting the Pub/Sub subscription
+ */
+ private void deletePubsubSubscription(String subscription) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
+ pubsubClient.projects().subscriptions().delete(subscription).execute();
+ }
+ }
+
+ /**
+ * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
+ * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
+ */
+ public void waitToFinish(PipelineResult result) {
+ pipelinesToCancel.add(result);
+ if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
+ addShutdownHook(pipelinesToCancel);
+ }
+ try {
+ result.waitUntilFinish();
+ } catch (UnsupportedOperationException e) {
+ // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
+ // such as EvaluationResults returned by DirectRunner.
+ tearDown();
+ printPendingMessages();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to wait the pipeline until finish: " + result);
+ }
+ }
+
+ private void addShutdownHook(final Collection<PipelineResult> pipelineResults) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ tearDown();
+ printPendingMessages();
+ for (PipelineResult pipelineResult : pipelineResults) {
+ try {
+ pipelineResult.cancel();
+ } catch (IOException e) {
+ System.out.println("Failed to cancel the job.");
+ System.out.println(e.getMessage());
+ }
+ }
+
+ for (PipelineResult pipelineResult : pipelineResults) {
+ boolean cancellationVerified = false;
+ for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
+ if (pipelineResult.getState().isTerminal()) {
+ cancellationVerified = true;
+ break;
+ } else {
+ System.out.println(
+ "The example pipeline is still running. Verifying the cancellation.");
+ }
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ }
+ if (!cancellationVerified) {
+ System.out.println("Failed to verify the cancellation for job: " + pipelineResult);
+ }
+ }
+ }
+ });
+ }
+
+ private void printPendingMessages() {
+ System.out.println();
+ System.out.println("***********************************************************");
+ System.out.println("***********************************************************");
+ for (String message : pendingMessages) {
+ System.out.println(message);
+ }
+ System.out.println("***********************************************************");
+ System.out.println("***********************************************************");
+ }
+
+ private static <T> T executeNullIfNotFound(
+ AbstractGoogleClientRequest<T> request) throws IOException {
+ try {
+ return request.execute();
+ } catch (GoogleJsonResponseException e) {
+ if (e.getStatusCode() == SC_NOT_FOUND) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
index 58e0821..6ca20f3 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
@@ -23,15 +23,15 @@ import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Arrays;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.IntraBundleParallelization;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.Transport;
/**
@@ -69,7 +69,7 @@ public class PubsubFileInjector {
}
}
- /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */
+ /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */
public static class Bound extends OldDoFn<String, Void> {
private final String outputTopic;
private final String timestampLabelKey;
@@ -83,7 +83,7 @@ public class PubsubFileInjector {
@Override
public void startBundle(Context context) {
this.pubsub =
- Transport.newPubsubClient(context.getPipelineOptions().as(DataflowPipelineOptions.class))
+ Transport.newPubsubClient(context.getPipelineOptions().as(PubsubOptions.class))
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
index 875d3d7..83d0f37 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
@@ -20,6 +20,7 @@ package ${package};
import ${package}.WordCount.CountWords;
import ${package}.WordCount.ExtractWordsFn;
import ${package}.WordCount.FormatAsTextFn;
+
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
@@ -28,8 +29,9 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
@@ -38,14 +40,13 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-
/**
* Tests of WordCount.
*/
@RunWith(JUnit4.class)
public class WordCountTest {
- /** Example test that tests a specific DoFn. */
+ /** Example test that tests a specific {@link DoFn}. */
@Test
public void testExtractWordsFn() throws Exception {
DoFnTester<String, String> extractWordsFn =
@@ -77,7 +78,7 @@ public class WordCountTest {
PCollection<String> input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = input.apply(new CountWords())
- .apply(ParDo.of(new FormatAsTextFn()));
+ .apply(MapElements.via(new FormatAsTextFn()));
PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
p.run();
[3/6] incubator-beam git commit: Additinal examples code cleanups
Posted by lc...@apache.org.
Additinal examples code cleanups
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7fbe4103
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7fbe4103
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7fbe4103
Branch: refs/heads/master
Commit: 7fbe41035b775b631d5e9120d005497813d9e36c
Parents: b9a66e4
Author: Pei He <pe...@google.com>
Authored: Fri Sep 9 14:15:49 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 13 18:01:08 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/examples/MinimalWordCount.java | 3 +--
.../main/java/org/apache/beam/examples/common/ExampleUtils.java | 2 +-
.../archetype-resources/src/main/java/DebuggingWordCount.java | 1 -
.../archetype-resources/src/main/java/MinimalWordCount.java | 3 +--
.../src/main/java/common/ExampleOptions.java | 5 -----
.../archetype-resources/src/main/java/common/ExampleUtils.java | 2 +-
6 files changed, 4 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7fbe4103/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index f772dd5..14ffa18 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -75,8 +75,7 @@ public class MinimalWordCount {
// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
// For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
// for more details.
- // options.as(FlinkPipelineOptions.class)
- // .setRunner(FlinkRunner.class);
+ // options.setRunner(FlinkRunner.class);
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7fbe4103/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 2e8dcf6..1209a67 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -53,7 +53,7 @@ import org.joda.time.Duration;
* The utility class that sets up and tears down external resources,
* and cancels the streaming pipelines once the program terminates.
*
- * <p>It is used to run Beam examples, such as TrafficMaxLaneFlow and TrafficRoutes.
+ * <p>It is used to run Beam examples.
*/
public class ExampleUtils {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7fbe4103/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index e315ba9..4099a37 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -17,7 +17,6 @@
*/
package ${package};
-import ${package}.WordCount;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7fbe4103/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
index f739fd8..e8497c0 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -75,8 +75,7 @@ public class MinimalWordCount {
// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
// For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
// for more details.
- // options.as(FlinkPipelineOptions.class)
- // .setRunner(FlinkRunner.class);
+ // options.setRunner(FlinkRunner.class);
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7fbe4103/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
index 90f935c..221e266 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
@@ -29,9 +29,4 @@ public interface ExampleOptions extends PipelineOptions {
@Default.Boolean(false)
boolean getKeepJobsRunning();
void setKeepJobsRunning(boolean keepJobsRunning);
-
- @Description("Number of workers to use when executing the injector pipeline")
- @Default.Integer(1)
- int getInjectorNumWorkers();
- void setInjectorNumWorkers(int numWorkers);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7fbe4103/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
index afef188..c1b6489 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
@@ -53,7 +53,7 @@ import org.joda.time.Duration;
* The utility class that sets up and tears down external resources,
* and cancels the streaming pipelines once the program terminates.
*
- * <p>It is used to run Beam examples, such as TrafficMaxLaneFlow and TrafficRoutes.
+ * <p>It is used to run Beam examples.
*/
public class ExampleUtils {
[5/6] incubator-beam git commit: Addressed comments
Posted by lc...@apache.org.
Addressed comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76f0ff4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76f0ff4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76f0ff4f
Branch: refs/heads/master
Commit: 76f0ff4f13bc59d8523e874a0a7edb5f0ba8a84d
Parents: 1f30255
Author: Pei He <pe...@google.com>
Authored: Tue Sep 13 17:59:40 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 13 18:01:09 2016 -0700
----------------------------------------------------------------------
sdks/java/maven-archetypes/examples/pom.xml | 3 ---
.../examples/src/main/resources/archetype-resources/pom.xml | 4 ++--
.../starter/src/main/resources/archetype-resources/pom.xml | 4 ++--
.../starter/src/test/resources/projects/basic/reference/pom.xml | 4 ++--
4 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76f0ff4f/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
index 75b88e2..dcdf94e 100644
--- a/sdks/java/maven-archetypes/examples/pom.xml
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -67,14 +67,12 @@
</pluginManagement>
</build>
<dependencies>
- <!-- Adds a dependency on a specific version of the Beam SDK. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<scope>runtime</scope>
</dependency>
- <!-- Adds a dependency on a specific version of the Dataflow runnner. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
@@ -87,7 +85,6 @@
<scope>runtime</scope>
</dependency>
- <!-- Adds a dependency on a specific version of the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76f0ff4f/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index 3f8408d..54b16b0 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -206,13 +206,13 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.7.7</version>
+ <version>1.7.14</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
- <version>1.7.7</version>
+ <version>1.7.14</version>
<!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
<scope>runtime</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76f0ff4f/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
index fb932af..abcd0d0 100644
--- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
@@ -62,12 +62,12 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.7.7</version>
+ <version>1.7.14</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
- <version>1.7.7</version>
+ <version>1.7.14</version>
</dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76f0ff4f/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
index 11e32fb..da94713 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
@@ -62,12 +62,12 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.7.7</version>
+ <version>1.7.14</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
- <version>1.7.7</version>
+ <version>1.7.14</version>
</dependency>
</dependencies>
</project>