You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/08 05:19:10 UTC
[1/2] incubator-beam git commit: Rename DataflowExampleUtils and
DataflowExampleOptions
Repository: incubator-beam
Updated Branches:
refs/heads/master 290c0b772 -> 921c55c94
Rename DataflowExampleUtils and DataflowExampleOptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a6f488f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a6f488f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a6f488f1
Branch: refs/heads/master
Commit: a6f488f1d3f12411002f3d0b20c74fc9b2f909df
Parents: 290c0b7
Author: Pei He <pe...@google.com>
Authored: Thu Jul 7 13:45:24 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 7 22:19:03 2016 -0700
----------------------------------------------------------------------
.../apache/beam/examples/WindowedWordCount.java | 12 +-
.../examples/common/DataflowExampleOptions.java | 37 --
.../examples/common/DataflowExampleUtils.java | 404 -------------------
.../beam/examples/common/ExampleOptions.java | 37 ++
.../beam/examples/common/ExampleUtils.java | 404 +++++++++++++++++++
.../beam/examples/complete/AutoComplete.java | 4 +-
.../examples/complete/StreamingWordExtract.java | 4 +-
.../examples/complete/TrafficMaxLaneFlow.java | 9 +-
.../beam/examples/complete/TrafficRoutes.java | 9 +-
.../beam/examples/cookbook/TriggerExample.java | 9 +-
.../beam/examples/complete/game/GameStats.java | 4 +-
.../examples/complete/game/LeaderBoard.java | 8 +-
12 files changed, 467 insertions(+), 474 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index d9dc26d..b32128a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.examples;
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BigQueryIO;
@@ -41,8 +41,6 @@ import com.google.api.services.bigquery.model.TableSchema;
import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -102,7 +100,6 @@ import java.util.List;
* and then exits.
*/
public class WindowedWordCount {
- private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
static final int WINDOW_SIZE = 1; // Default window duration in minutes
/**
@@ -179,7 +176,7 @@ public class WindowedWordCount {
* specification of the input file.
*/
public static interface Options extends WordCount.WordCountOptions,
- DataflowExampleOptions, ExampleBigQueryTableOptions {
+ ExampleOptions, ExampleBigQueryTableOptions {
@Description("Fixed window duration, in minutes")
@Default.Integer(WINDOW_SIZE)
Integer getWindowSize();
@@ -195,8 +192,7 @@ public class WindowedWordCount {
options.setBigQuerySchema(getSchema());
// DataflowExampleUtils creates the necessary input sources to simplify execution of this
// Pipeline.
- DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
- options.isUnbounded());
+ ExampleUtils exampleDataflowUtils = new ExampleUtils(options, options.isUnbounded());
Pipeline pipeline = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
deleted file mode 100644
index 2e8ef3d..0000000
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.common;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-
-/**
- * Options that can be used to configure the Dataflow examples.
- */
-public interface DataflowExampleOptions extends DataflowPipelineOptions {
- @Description("Whether to keep jobs running on the Dataflow service after local process exit")
- @Default.Boolean(false)
- boolean getKeepJobsRunning();
- void setKeepJobsRunning(boolean keepJobsRunning);
-
- @Description("Number of workers to use when executing the injector pipeline")
- @Default.Integer(1)
- int getInjectorNumWorkers();
- void setInjectorNumWorkers(int numWorkers);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
deleted file mode 100644
index a90968a..0000000
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.common;
-
-import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.Transport;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Datasets;
-import com.google.api.services.bigquery.Bigquery.Tables;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Uninterruptibles;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub
- * injector, and cancels the streaming and the injector pipelines once the program terminates.
- *
- * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes.
- */
-public class DataflowExampleUtils {
-
- private static final int SC_NOT_FOUND = 404;
-
- private final DataflowPipelineOptions options;
- private Bigquery bigQueryClient = null;
- private Pubsub pubsubClient = null;
- private Dataflow dataflowClient = null;
- private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
- private List<String> pendingMessages = Lists.newArrayList();
-
- public DataflowExampleUtils(DataflowPipelineOptions options) {
- this.options = options;
- }
-
- /**
- * Do resources and runner options setup.
- */
- public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
- throws IOException {
- this.options = options;
- setupResourcesAndRunner(isUnbounded);
- }
-
- /**
- * Sets up external resources that are required by the example,
- * such as Pub/Sub topics and BigQuery tables.
- *
- * @throws IOException if there is a problem setting up the resources
- */
- public void setup() throws IOException {
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200);
- Throwable lastException = null;
- try {
- do {
- try {
- setupPubsub();
- setupBigQueryTable();
- return;
- } catch (GoogleJsonResponseException e) {
- lastException = e;
- }
- } while (BackOffUtils.next(sleeper, backOff));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // Ignore InterruptedException
- }
- throw new RuntimeException(lastException);
- }
-
- /**
- * Set up external resources, and configure the runner appropriately.
- */
- public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
- if (isUnbounded) {
- options.setStreaming(true);
- }
- setup();
- setupRunner();
- }
-
- /**
- * Sets up the Google Cloud Pub/Sub topic.
- *
- * <p>If the topic doesn't exist, a new topic with the given name will be created.
- *
- * @throws IOException if there is a problem setting up the Pub/Sub topic
- */
- public void setupPubsub() throws IOException {
- ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
- options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
- if (!pubsubOptions.getPubsubTopic().isEmpty()) {
- pendingMessages.add("**********************Set Up Pubsub************************");
- setupPubsubTopic(pubsubOptions.getPubsubTopic());
- pendingMessages.add("The Pub/Sub topic has been set up for this example: "
- + pubsubOptions.getPubsubTopic());
-
- if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
- setupPubsubSubscription(
- pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
- pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
- + pubsubOptions.getPubsubSubscription());
- }
- }
- }
-
- /**
- * Sets up the BigQuery table with the given schema.
- *
- * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
- * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
- * will be created.
- *
- * @throws IOException if there is a problem setting up the BigQuery table
- */
- public void setupBigQueryTable() throws IOException {
- ExampleBigQueryTableOptions bigQueryTableOptions =
- options.as(ExampleBigQueryTableOptions.class);
- if (bigQueryTableOptions.getBigQueryDataset() != null
- && bigQueryTableOptions.getBigQueryTable() != null
- && bigQueryTableOptions.getBigQuerySchema() != null) {
- pendingMessages.add("******************Set Up Big Query Table*******************");
- setupBigQueryTable(bigQueryTableOptions.getProject(),
- bigQueryTableOptions.getBigQueryDataset(),
- bigQueryTableOptions.getBigQueryTable(),
- bigQueryTableOptions.getBigQuerySchema());
- pendingMessages.add("The BigQuery table has been set up for this example: "
- + bigQueryTableOptions.getProject()
- + ":" + bigQueryTableOptions.getBigQueryDataset()
- + "." + bigQueryTableOptions.getBigQueryTable());
- }
- }
-
- /**
- * Tears down external resources that can be deleted upon the example's completion.
- */
- private void tearDown() {
- pendingMessages.add("*************************Tear Down*************************");
- ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
- options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
- if (!pubsubOptions.getPubsubTopic().isEmpty()) {
- try {
- deletePubsubTopic(pubsubOptions.getPubsubTopic());
- pendingMessages.add("The Pub/Sub topic has been deleted: "
- + pubsubOptions.getPubsubTopic());
- } catch (IOException e) {
- pendingMessages.add("Failed to delete the Pub/Sub topic : "
- + pubsubOptions.getPubsubTopic());
- }
- if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
- try {
- deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
- pendingMessages.add("The Pub/Sub subscription has been deleted: "
- + pubsubOptions.getPubsubSubscription());
- } catch (IOException e) {
- pendingMessages.add("Failed to delete the Pub/Sub subscription : "
- + pubsubOptions.getPubsubSubscription());
- }
- }
- }
-
- ExampleBigQueryTableOptions bigQueryTableOptions =
- options.as(ExampleBigQueryTableOptions.class);
- if (bigQueryTableOptions.getBigQueryDataset() != null
- && bigQueryTableOptions.getBigQueryTable() != null
- && bigQueryTableOptions.getBigQuerySchema() != null) {
- pendingMessages.add("The BigQuery table might contain the example's output, "
- + "and it is not deleted automatically: "
- + bigQueryTableOptions.getProject()
- + ":" + bigQueryTableOptions.getBigQueryDataset()
- + "." + bigQueryTableOptions.getBigQueryTable());
- pendingMessages.add("Please go to the Developers Console to delete it manually."
- + " Otherwise, you may be charged for its usage.");
- }
- }
-
- private void setupBigQueryTable(String projectId, String datasetId, String tableId,
- TableSchema schema) throws IOException {
- if (bigQueryClient == null) {
- bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
- }
-
- Datasets datasetService = bigQueryClient.datasets();
- if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
- Dataset newDataset = new Dataset().setDatasetReference(
- new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
- datasetService.insert(projectId, newDataset).execute();
- }
-
- Tables tableService = bigQueryClient.tables();
- Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
- if (table == null) {
- Table newTable = new Table().setSchema(schema).setTableReference(
- new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
- tableService.insert(projectId, datasetId, newTable).execute();
- } else if (!table.getSchema().equals(schema)) {
- throw new RuntimeException(
- "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
- + ", actual: " + table.getSchema().toPrettyString());
- }
- }
-
- private void setupPubsubTopic(String topic) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
- pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
- }
- }
-
- private void setupPubsubSubscription(String topic, String subscription) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
- Subscription subInfo = new Subscription()
- .setAckDeadlineSeconds(60)
- .setTopic(topic);
- pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
- }
- }
-
- /**
- * Deletes the Google Cloud Pub/Sub topic.
- *
- * @throws IOException if there is a problem deleting the Pub/Sub topic
- */
- private void deletePubsubTopic(String topic) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
- pubsubClient.projects().topics().delete(topic).execute();
- }
- }
-
- /**
- * Deletes the Google Cloud Pub/Sub subscription.
- *
- * @throws IOException if there is a problem deleting the Pub/Sub subscription
- */
- private void deletePubsubSubscription(String subscription) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
- pubsubClient.projects().subscriptions().delete(subscription).execute();
- }
- }
-
- /**
- * Do some runner setup: check that the DirectRunner is not used in conjunction with
- * streaming, and if streaming is specified, use the DataflowRunner.
- */
- public void setupRunner() {
- Class<? extends PipelineRunner<?>> runner = options.getRunner();
- if (options.isStreaming()
- && (runner.equals(DataflowRunner.class)
- || runner.equals(BlockingDataflowRunner.class))) {
- // In order to cancel the pipelines automatically,
- // {@literal DataflowRunner} is forced to be used.
- options.setRunner(DataflowRunner.class);
- }
- }
-
- /**
- * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
- * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
- */
- public void waitToFinish(PipelineResult result) {
- if (result instanceof DataflowPipelineJob) {
- final DataflowPipelineJob job = (DataflowPipelineJob) result;
- jobsToCancel.add(job);
- if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) {
- addShutdownHook(jobsToCancel);
- }
- try {
- job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out));
- } catch (Exception e) {
- throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
- }
- } else {
- // Do nothing if the given PipelineResult doesn't support waitToFinish(),
- // such as EvaluationResults returned by DirectRunner.
- tearDown();
- printPendingMessages();
- }
- }
-
- private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
- if (dataflowClient == null) {
- dataflowClient = options.getDataflowClient();
- }
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- tearDown();
- printPendingMessages();
- for (DataflowPipelineJob job : jobs) {
- System.out.println("Canceling example pipeline: " + job.getJobId());
- try {
- job.cancel();
- } catch (IOException e) {
- System.out.println("Failed to cancel the job,"
- + " please go to the Developers Console to cancel it manually");
- System.out.println(
- MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
- }
- }
-
- for (DataflowPipelineJob job : jobs) {
- boolean cancellationVerified = false;
- for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
- if (job.getState().isTerminal()) {
- cancellationVerified = true;
- System.out.println("Canceled example pipeline: " + job.getJobId());
- break;
- } else {
- System.out.println(
- "The example pipeline is still running. Verifying the cancellation.");
- }
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
- }
- if (!cancellationVerified) {
- System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
- System.out.println("Please go to the Developers Console to verify manually:");
- System.out.println(
- MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
- }
- }
- }
- });
- }
-
- private void printPendingMessages() {
- System.out.println();
- System.out.println("***********************************************************");
- System.out.println("***********************************************************");
- for (String message : pendingMessages) {
- System.out.println(message);
- }
- System.out.println("***********************************************************");
- System.out.println("***********************************************************");
- }
-
- private static <T> T executeNullIfNotFound(
- AbstractGoogleClientRequest<T> request) throws IOException {
- try {
- return request.execute();
- } catch (GoogleJsonResponseException e) {
- if (e.getStatusCode() == SC_NOT_FOUND) {
- return null;
- } else {
- throw e;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
new file mode 100644
index 0000000..bba7b21
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.common;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+
+/**
+ * Options that can be used to configure the Beam examples.
+ */
+public interface ExampleOptions extends DataflowPipelineOptions {
+ @Description("Whether to keep jobs running on the Dataflow service after local process exit")
+ @Default.Boolean(false)
+ boolean getKeepJobsRunning();
+ void setKeepJobsRunning(boolean keepJobsRunning);
+
+ @Description("Number of workers to use when executing the injector pipeline")
+ @Default.Integer(1)
+ int getInjectorNumWorkers();
+ void setInjectorNumWorkers(int numWorkers);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
new file mode 100644
index 0000000..e30b1e4
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.common;
+
+import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.Transport;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Datasets;
+import com.google.api.services.bigquery.Bigquery.Tables;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The utility class that sets up and tears down external resources,
+ * and cancels the streaming pipelines once the program terminates.
+ *
+ * <p>It is used to run Beam examples, such as TrafficMaxLaneFlow and TrafficRoutes.
+ */
+public class ExampleUtils {
+
+ private static final int SC_NOT_FOUND = 404;
+
+ private final DataflowPipelineOptions options;
+ private Bigquery bigQueryClient = null;
+ private Pubsub pubsubClient = null;
+ private Dataflow dataflowClient = null;
+ private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
+ private List<String> pendingMessages = Lists.newArrayList();
+
+ public ExampleUtils(DataflowPipelineOptions options) {
+ this.options = options;
+ }
+
+ /**
+ * Do resources and runner options setup.
+ */
+ public ExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
+ throws IOException {
+ this.options = options;
+ setupResourcesAndRunner(isUnbounded);
+ }
+
+ /**
+ * Sets up external resources that are required by the example,
+ * such as Pub/Sub topics and BigQuery tables.
+ *
+ * @throws IOException if there is a problem setting up the resources
+ */
+ public void setup() throws IOException {
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200);
+ Throwable lastException = null;
+ try {
+ do {
+ try {
+ setupPubsub();
+ setupBigQueryTable();
+ return;
+ } catch (GoogleJsonResponseException e) {
+ lastException = e;
+ }
+ } while (BackOffUtils.next(sleeper, backOff));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // Ignore InterruptedException
+ }
+ throw new RuntimeException(lastException);
+ }
+
+ /**
+ * Set up external resources, and configure the runner appropriately.
+ */
+ public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
+ if (isUnbounded) {
+ options.setStreaming(true);
+ }
+ setup();
+ setupRunner();
+ }
+
+ /**
+ * Sets up the Google Cloud Pub/Sub topic.
+ *
+ * <p>If the topic doesn't exist, a new topic with the given name will be created.
+ *
+ * @throws IOException if there is a problem setting up the Pub/Sub topic
+ */
+ public void setupPubsub() throws IOException {
+ ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+ options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+ if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+ pendingMessages.add("**********************Set Up Pubsub************************");
+ setupPubsubTopic(pubsubOptions.getPubsubTopic());
+ pendingMessages.add("The Pub/Sub topic has been set up for this example: "
+ + pubsubOptions.getPubsubTopic());
+
+ if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+ setupPubsubSubscription(
+ pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
+ pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
+ + pubsubOptions.getPubsubSubscription());
+ }
+ }
+ }
+
+ /**
+ * Sets up the BigQuery table with the given schema.
+ *
+ * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
+ * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
+ * will be created.
+ *
+ * @throws IOException if there is a problem setting up the BigQuery table
+ */
+ public void setupBigQueryTable() throws IOException {
+ ExampleBigQueryTableOptions bigQueryTableOptions =
+ options.as(ExampleBigQueryTableOptions.class);
+ if (bigQueryTableOptions.getBigQueryDataset() != null
+ && bigQueryTableOptions.getBigQueryTable() != null
+ && bigQueryTableOptions.getBigQuerySchema() != null) {
+ pendingMessages.add("******************Set Up Big Query Table*******************");
+ setupBigQueryTable(bigQueryTableOptions.getProject(),
+ bigQueryTableOptions.getBigQueryDataset(),
+ bigQueryTableOptions.getBigQueryTable(),
+ bigQueryTableOptions.getBigQuerySchema());
+ pendingMessages.add("The BigQuery table has been set up for this example: "
+ + bigQueryTableOptions.getProject()
+ + ":" + bigQueryTableOptions.getBigQueryDataset()
+ + "." + bigQueryTableOptions.getBigQueryTable());
+ }
+ }
+
+ /**
+ * Tears down external resources that can be deleted upon the example's completion.
+ */
+ private void tearDown() {
+ pendingMessages.add("*************************Tear Down*************************");
+ ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+ options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+ if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+ try {
+ deletePubsubTopic(pubsubOptions.getPubsubTopic());
+ pendingMessages.add("The Pub/Sub topic has been deleted: "
+ + pubsubOptions.getPubsubTopic());
+ } catch (IOException e) {
+ pendingMessages.add("Failed to delete the Pub/Sub topic : "
+ + pubsubOptions.getPubsubTopic());
+ }
+ if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+ try {
+ deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
+ pendingMessages.add("The Pub/Sub subscription has been deleted: "
+ + pubsubOptions.getPubsubSubscription());
+ } catch (IOException e) {
+ pendingMessages.add("Failed to delete the Pub/Sub subscription : "
+ + pubsubOptions.getPubsubSubscription());
+ }
+ }
+ }
+
+ ExampleBigQueryTableOptions bigQueryTableOptions =
+ options.as(ExampleBigQueryTableOptions.class);
+ if (bigQueryTableOptions.getBigQueryDataset() != null
+ && bigQueryTableOptions.getBigQueryTable() != null
+ && bigQueryTableOptions.getBigQuerySchema() != null) {
+ pendingMessages.add("The BigQuery table might contain the example's output, "
+ + "and it is not deleted automatically: "
+ + bigQueryTableOptions.getProject()
+ + ":" + bigQueryTableOptions.getBigQueryDataset()
+ + "." + bigQueryTableOptions.getBigQueryTable());
+ pendingMessages.add("Please go to the Developers Console to delete it manually."
+ + " Otherwise, you may be charged for its usage.");
+ }
+ }
+
+ private void setupBigQueryTable(String projectId, String datasetId, String tableId,
+ TableSchema schema) throws IOException {
+ if (bigQueryClient == null) {
+ bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
+ }
+
+ Datasets datasetService = bigQueryClient.datasets();
+ if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
+ Dataset newDataset = new Dataset().setDatasetReference(
+ new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
+ datasetService.insert(projectId, newDataset).execute();
+ }
+
+ Tables tableService = bigQueryClient.tables();
+ Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
+ if (table == null) {
+ Table newTable = new Table().setSchema(schema).setTableReference(
+ new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
+ tableService.insert(projectId, datasetId, newTable).execute();
+ } else if (!table.getSchema().equals(schema)) {
+ throw new RuntimeException(
+ "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
+ + ", actual: " + table.getSchema().toPrettyString());
+ }
+ }
+
+ private void setupPubsubTopic(String topic) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
+ pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
+ }
+ }
+
+ private void setupPubsubSubscription(String topic, String subscription) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
+ Subscription subInfo = new Subscription()
+ .setAckDeadlineSeconds(60)
+ .setTopic(topic);
+ pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
+ }
+ }
+
+ /**
+ * Deletes the Google Cloud Pub/Sub topic.
+ *
+ * @throws IOException if there is a problem deleting the Pub/Sub topic
+ */
+ private void deletePubsubTopic(String topic) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
+ pubsubClient.projects().topics().delete(topic).execute();
+ }
+ }
+
+ /**
+ * Deletes the Google Cloud Pub/Sub subscription.
+ *
+ * @throws IOException if there is a problem deleting the Pub/Sub subscription
+ */
+ private void deletePubsubSubscription(String subscription) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
+ pubsubClient.projects().subscriptions().delete(subscription).execute();
+ }
+ }
+
+ /**
+ * Do some runner setup: check that the DirectRunner is not used in conjunction with
+ * streaming, and if streaming is specified, use the DataflowRunner.
+ */
+ public void setupRunner() {
+ Class<? extends PipelineRunner<?>> runner = options.getRunner();
+ if (options.isStreaming()
+ && (runner.equals(DataflowRunner.class)
+ || runner.equals(BlockingDataflowRunner.class))) {
+ // In order to cancel the pipelines automatically,
+ // {@literal DataflowRunner} is forced to be used.
+ options.setRunner(DataflowRunner.class);
+ }
+ }
+
+ /**
+ * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
+ * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
+ */
+ public void waitToFinish(PipelineResult result) {
+ if (result instanceof DataflowPipelineJob) {
+ final DataflowPipelineJob job = (DataflowPipelineJob) result;
+ jobsToCancel.add(job);
+ if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
+ addShutdownHook(jobsToCancel);
+ }
+ try {
+ job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
+ }
+ } else {
+ // Do nothing if the given PipelineResult doesn't support waitToFinish(),
+ // such as EvaluationResults returned by DirectRunner.
+ tearDown();
+ printPendingMessages();
+ }
+ }
+
+ private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
+ if (dataflowClient == null) {
+ dataflowClient = options.getDataflowClient();
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ tearDown();
+ printPendingMessages();
+ for (DataflowPipelineJob job : jobs) {
+ System.out.println("Canceling example pipeline: " + job.getJobId());
+ try {
+ job.cancel();
+ } catch (IOException e) {
+ System.out.println("Failed to cancel the job,"
+ + " please go to the Developers Console to cancel it manually");
+ System.out.println(
+ MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+ }
+ }
+
+ for (DataflowPipelineJob job : jobs) {
+ boolean cancellationVerified = false;
+ for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
+ if (job.getState().isTerminal()) {
+ cancellationVerified = true;
+ System.out.println("Canceled example pipeline: " + job.getJobId());
+ break;
+ } else {
+ System.out.println(
+ "The example pipeline is still running. Verifying the cancellation.");
+ }
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+ }
+ if (!cancellationVerified) {
+ System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
+ System.out.println("Please go to the Developers Console to verify manually:");
+ System.out.println(
+ MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+ }
+ }
+ }
+ });
+ }
+
+ private void printPendingMessages() {
+ System.out.println();
+ System.out.println("***********************************************************");
+ System.out.println("***********************************************************");
+ for (String message : pendingMessages) {
+ System.out.println(message);
+ }
+ System.out.println("***********************************************************");
+ System.out.println("***********************************************************");
+ }
+
+ private static <T> T executeNullIfNotFound(
+ AbstractGoogleClientRequest<T> request) throws IOException {
+ try {
+ return request.execute();
+ } catch (GoogleJsonResponseException e) {
+ if (e.getStatusCode() == SC_NOT_FOUND) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 98c4994..f8cd0f1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -20,8 +20,8 @@ package org.apache.beam.examples.complete;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
@@ -451,7 +451,7 @@ public class AutoComplete {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setBigQuerySchema(FormatForBigquery.getSchema());
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ ExampleUtils dataflowUtils = new ExampleUtils(options);
// We support running the same pipeline in either
// batch or windowed streaming mode.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index 4ea199c..046428c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.examples.complete;
-import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BigQueryIO;
@@ -120,7 +120,7 @@ public class StreamingWordExtract {
options.setStreaming(true);
options.setBigQuerySchema(StringToRowConverter.getSchema());
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ ExampleUtils dataflowUtils = new ExampleUtils(options);
dataflowUtils.setup();
Pipeline pipeline = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 2db7c9e..1bbc68b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.examples.complete;
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
@@ -307,8 +307,7 @@ public class TrafficMaxLaneFlow {
*
* <p>Inherits standard configuration options.
*/
- private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions,
- ExampleBigQueryTableOptions {
+ private interface TrafficMaxLaneFlowOptions extends ExampleOptions, ExampleBigQueryTableOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/traffic_sensor/"
+ "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
@@ -342,7 +341,7 @@ public class TrafficMaxLaneFlow {
.as(TrafficMaxLaneFlowOptions.class);
options.setBigQuerySchema(FormatMaxesFn.getSchema());
// Using DataflowExampleUtils to set up required resources.
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
+ ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded());
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 89cfbfc..8af0922 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.examples.complete;
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
@@ -317,8 +317,7 @@ public class TrafficRoutes {
*
* <p>Inherits standard configuration options.
*/
- private interface TrafficRoutesOptions extends DataflowExampleOptions,
- ExampleBigQueryTableOptions {
+ private interface TrafficRoutesOptions extends ExampleOptions, ExampleBigQueryTableOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/traffic_sensor/"
+ "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
@@ -353,7 +352,7 @@ public class TrafficRoutes {
options.setBigQuerySchema(FormatStatsFn.getSchema());
// Using DataflowExampleUtils to set up required resources.
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
+ ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded());
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 5e60835..aa91ac6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.examples.cookbook;
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BigQueryIO;
@@ -419,8 +419,7 @@ public class TriggerExample {
/**
* Inherits standard configuration options.
*/
- public interface TrafficFlowOptions
- extends ExampleBigQueryTableOptions, DataflowExampleOptions {
+ public interface TrafficFlowOptions extends ExampleBigQueryTableOptions, ExampleOptions {
@Description("Input file to read from")
@Default.String("gs://dataflow-samples/traffic_sensor/"
@@ -444,7 +443,7 @@ public class TriggerExample {
options.setBigQuerySchema(getSchema());
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ ExampleUtils dataflowUtils = new ExampleUtils(options);
dataflowUtils.setup();
Pipeline pipeline = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index b1cb312..5b27f83 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.examples.complete.game;
-import org.apache.beam.examples.common.DataflowExampleUtils;
+import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -242,7 +242,7 @@ public class GameStats extends LeaderBoard {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ ExampleUtils dataflowUtils = new ExampleUtils(options);
Pipeline pipeline = Pipeline.create(options);
// Read Events from Pub/Sub using custom timestamps
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index a14d533..051b4de 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.examples.complete.game;
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
@@ -102,7 +102,7 @@ public class LeaderBoard extends HourlyTeamScore {
/**
* Options supported by {@link LeaderBoard}.
*/
- static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions {
+ static interface Options extends HourlyTeamScore.Options, ExampleOptions {
@Description("Pub/Sub topic to read from")
@Validation.Required
@@ -178,7 +178,7 @@ public class LeaderBoard extends HourlyTeamScore {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ ExampleUtils dataflowUtils = new ExampleUtils(options);
Pipeline pipeline = Pipeline.create(options);
// Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
[2/2] incubator-beam git commit: Closes #605
Posted by dh...@apache.org.
Closes #605
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/921c55c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/921c55c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/921c55c9
Branch: refs/heads/master
Commit: 921c55c94f72210d010359e628f2dcfe866f84d5
Parents: 290c0b7 a6f488f
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jul 7 22:19:04 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 7 22:19:04 2016 -0700
----------------------------------------------------------------------
.../apache/beam/examples/WindowedWordCount.java | 12 +-
.../examples/common/DataflowExampleOptions.java | 37 --
.../examples/common/DataflowExampleUtils.java | 404 -------------------
.../beam/examples/common/ExampleOptions.java | 37 ++
.../beam/examples/common/ExampleUtils.java | 404 +++++++++++++++++++
.../beam/examples/complete/AutoComplete.java | 4 +-
.../examples/complete/StreamingWordExtract.java | 4 +-
.../examples/complete/TrafficMaxLaneFlow.java | 9 +-
.../beam/examples/complete/TrafficRoutes.java | 9 +-
.../beam/examples/cookbook/TriggerExample.java | 9 +-
.../beam/examples/complete/game/GameStats.java | 4 +-
.../examples/complete/game/LeaderBoard.java | 8 +-
12 files changed, 467 insertions(+), 474 deletions(-)
----------------------------------------------------------------------