You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/08 05:19:10 UTC

[1/2] incubator-beam git commit: Rename DataflowExampleUtils and DataflowExampleOptions

Repository: incubator-beam
Updated Branches:
  refs/heads/master 290c0b772 -> 921c55c94


Rename DataflowExampleUtils and DataflowExampleOptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a6f488f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a6f488f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a6f488f1

Branch: refs/heads/master
Commit: a6f488f1d3f12411002f3d0b20c74fc9b2f909df
Parents: 290c0b7
Author: Pei He <pe...@google.com>
Authored: Thu Jul 7 13:45:24 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 7 22:19:03 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/WindowedWordCount.java |  12 +-
 .../examples/common/DataflowExampleOptions.java |  37 --
 .../examples/common/DataflowExampleUtils.java   | 404 -------------------
 .../beam/examples/common/ExampleOptions.java    |  37 ++
 .../beam/examples/common/ExampleUtils.java      | 404 +++++++++++++++++++
 .../beam/examples/complete/AutoComplete.java    |   4 +-
 .../examples/complete/StreamingWordExtract.java |   4 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |   9 +-
 .../beam/examples/complete/TrafficRoutes.java   |   9 +-
 .../beam/examples/cookbook/TriggerExample.java  |   9 +-
 .../beam/examples/complete/game/GameStats.java  |   4 +-
 .../examples/complete/game/LeaderBoard.java     |   8 +-
 12 files changed, 467 insertions(+), 474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index d9dc26d..b32128a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.examples;
 
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BigQueryIO;
@@ -41,8 +41,6 @@ import com.google.api.services.bigquery.model.TableSchema;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -102,7 +100,6 @@ import java.util.List;
  * and then exits.
  */
 public class WindowedWordCount {
-    private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
     static final int WINDOW_SIZE = 1;  // Default window duration in minutes
 
   /**
@@ -179,7 +176,7 @@ public class WindowedWordCount {
    * specification of the input file.
    */
   public static interface Options extends WordCount.WordCountOptions,
-      DataflowExampleOptions, ExampleBigQueryTableOptions {
+      ExampleOptions, ExampleBigQueryTableOptions {
     @Description("Fixed window duration, in minutes")
     @Default.Integer(WINDOW_SIZE)
     Integer getWindowSize();
@@ -195,8 +192,7 @@ public class WindowedWordCount {
     options.setBigQuerySchema(getSchema());
     // DataflowExampleUtils creates the necessary input sources to simplify execution of this
     // Pipeline.
-    DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
-      options.isUnbounded());
+    ExampleUtils exampleDataflowUtils = new ExampleUtils(options, options.isUnbounded());
 
     Pipeline pipeline = Pipeline.create(options);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
deleted file mode 100644
index 2e8ef3d..0000000
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.common;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-
-/**
- * Options that can be used to configure the Dataflow examples.
- */
-public interface DataflowExampleOptions extends DataflowPipelineOptions {
-  @Description("Whether to keep jobs running on the Dataflow service after local process exit")
-  @Default.Boolean(false)
-  boolean getKeepJobsRunning();
-  void setKeepJobsRunning(boolean keepJobsRunning);
-
-  @Description("Number of workers to use when executing the injector pipeline")
-  @Default.Integer(1)
-  int getInjectorNumWorkers();
-  void setInjectorNumWorkers(int numWorkers);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
deleted file mode 100644
index a90968a..0000000
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.common;
-
-import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.Transport;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Datasets;
-import com.google.api.services.bigquery.Bigquery.Tables;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Uninterruptibles;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub
- * injector, and cancels the streaming and the injector pipelines once the program terminates.
- *
- * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes.
- */
-public class DataflowExampleUtils {
-
-  private static final int SC_NOT_FOUND = 404;
-
-  private final DataflowPipelineOptions options;
-  private Bigquery bigQueryClient = null;
-  private Pubsub pubsubClient = null;
-  private Dataflow dataflowClient = null;
-  private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
-  private List<String> pendingMessages = Lists.newArrayList();
-
-  public DataflowExampleUtils(DataflowPipelineOptions options) {
-    this.options = options;
-  }
-
-  /**
-   * Do resources and runner options setup.
-   */
-  public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
-      throws IOException {
-    this.options = options;
-    setupResourcesAndRunner(isUnbounded);
-  }
-
-  /**
-   * Sets up external resources that are required by the example,
-   * such as Pub/Sub topics and BigQuery tables.
-   *
-   * @throws IOException if there is a problem setting up the resources
-   */
-  public void setup() throws IOException {
-    Sleeper sleeper = Sleeper.DEFAULT;
-    BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200);
-    Throwable lastException = null;
-    try {
-      do {
-        try {
-          setupPubsub();
-          setupBigQueryTable();
-          return;
-        } catch (GoogleJsonResponseException e) {
-          lastException = e;
-        }
-      } while (BackOffUtils.next(sleeper, backOff));
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      // Ignore InterruptedException
-    }
-    throw new RuntimeException(lastException);
-  }
-
-  /**
-   * Set up external resources, and configure the runner appropriately.
-   */
-  public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
-    if (isUnbounded) {
-      options.setStreaming(true);
-    }
-    setup();
-    setupRunner();
-  }
-
-  /**
-   * Sets up the Google Cloud Pub/Sub topic.
-   *
-   * <p>If the topic doesn't exist, a new topic with the given name will be created.
-   *
-   * @throws IOException if there is a problem setting up the Pub/Sub topic
-   */
-  public void setupPubsub() throws IOException {
-    ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
-        options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
-    if (!pubsubOptions.getPubsubTopic().isEmpty()) {
-      pendingMessages.add("**********************Set Up Pubsub************************");
-      setupPubsubTopic(pubsubOptions.getPubsubTopic());
-      pendingMessages.add("The Pub/Sub topic has been set up for this example: "
-          + pubsubOptions.getPubsubTopic());
-
-      if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
-        setupPubsubSubscription(
-            pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
-        pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
-            + pubsubOptions.getPubsubSubscription());
-      }
-    }
-  }
-
-  /**
-   * Sets up the BigQuery table with the given schema.
-   *
-   * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
-   * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
-   * will be created.
-   *
-   * @throws IOException if there is a problem setting up the BigQuery table
-   */
-  public void setupBigQueryTable() throws IOException {
-    ExampleBigQueryTableOptions bigQueryTableOptions =
-        options.as(ExampleBigQueryTableOptions.class);
-    if (bigQueryTableOptions.getBigQueryDataset() != null
-        && bigQueryTableOptions.getBigQueryTable() != null
-        && bigQueryTableOptions.getBigQuerySchema() != null) {
-      pendingMessages.add("******************Set Up Big Query Table*******************");
-      setupBigQueryTable(bigQueryTableOptions.getProject(),
-                         bigQueryTableOptions.getBigQueryDataset(),
-                         bigQueryTableOptions.getBigQueryTable(),
-                         bigQueryTableOptions.getBigQuerySchema());
-      pendingMessages.add("The BigQuery table has been set up for this example: "
-          + bigQueryTableOptions.getProject()
-          + ":" + bigQueryTableOptions.getBigQueryDataset()
-          + "." + bigQueryTableOptions.getBigQueryTable());
-    }
-  }
-
-  /**
-   * Tears down external resources that can be deleted upon the example's completion.
-   */
-  private void tearDown() {
-    pendingMessages.add("*************************Tear Down*************************");
-    ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
-        options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
-    if (!pubsubOptions.getPubsubTopic().isEmpty()) {
-      try {
-        deletePubsubTopic(pubsubOptions.getPubsubTopic());
-        pendingMessages.add("The Pub/Sub topic has been deleted: "
-            + pubsubOptions.getPubsubTopic());
-      } catch (IOException e) {
-        pendingMessages.add("Failed to delete the Pub/Sub topic : "
-            + pubsubOptions.getPubsubTopic());
-      }
-      if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
-        try {
-          deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
-          pendingMessages.add("The Pub/Sub subscription has been deleted: "
-              + pubsubOptions.getPubsubSubscription());
-        } catch (IOException e) {
-          pendingMessages.add("Failed to delete the Pub/Sub subscription : "
-              + pubsubOptions.getPubsubSubscription());
-        }
-      }
-    }
-
-    ExampleBigQueryTableOptions bigQueryTableOptions =
-        options.as(ExampleBigQueryTableOptions.class);
-    if (bigQueryTableOptions.getBigQueryDataset() != null
-        && bigQueryTableOptions.getBigQueryTable() != null
-        && bigQueryTableOptions.getBigQuerySchema() != null) {
-      pendingMessages.add("The BigQuery table might contain the example's output, "
-          + "and it is not deleted automatically: "
-          + bigQueryTableOptions.getProject()
-          + ":" + bigQueryTableOptions.getBigQueryDataset()
-          + "." + bigQueryTableOptions.getBigQueryTable());
-      pendingMessages.add("Please go to the Developers Console to delete it manually."
-          + " Otherwise, you may be charged for its usage.");
-    }
-  }
-
-  private void setupBigQueryTable(String projectId, String datasetId, String tableId,
-      TableSchema schema) throws IOException {
-    if (bigQueryClient == null) {
-      bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
-    }
-
-    Datasets datasetService = bigQueryClient.datasets();
-    if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
-      Dataset newDataset = new Dataset().setDatasetReference(
-          new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
-      datasetService.insert(projectId, newDataset).execute();
-    }
-
-    Tables tableService = bigQueryClient.tables();
-    Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
-    if (table == null) {
-      Table newTable = new Table().setSchema(schema).setTableReference(
-          new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
-      tableService.insert(projectId, datasetId, newTable).execute();
-    } else if (!table.getSchema().equals(schema)) {
-      throw new RuntimeException(
-          "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
-          + ", actual: " + table.getSchema().toPrettyString());
-    }
-  }
-
-  private void setupPubsubTopic(String topic) throws IOException {
-    if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
-    }
-    if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
-      pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
-    }
-  }
-
-  private void setupPubsubSubscription(String topic, String subscription) throws IOException {
-    if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
-    }
-    if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
-      Subscription subInfo = new Subscription()
-        .setAckDeadlineSeconds(60)
-        .setTopic(topic);
-      pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
-    }
-  }
-
-  /**
-   * Deletes the Google Cloud Pub/Sub topic.
-   *
-   * @throws IOException if there is a problem deleting the Pub/Sub topic
-   */
-  private void deletePubsubTopic(String topic) throws IOException {
-    if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
-    }
-    if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
-      pubsubClient.projects().topics().delete(topic).execute();
-    }
-  }
-
-  /**
-   * Deletes the Google Cloud Pub/Sub subscription.
-   *
-   * @throws IOException if there is a problem deleting the Pub/Sub subscription
-   */
-  private void deletePubsubSubscription(String subscription) throws IOException {
-    if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
-    }
-    if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
-      pubsubClient.projects().subscriptions().delete(subscription).execute();
-    }
-  }
-
-  /**
-   * Do some runner setup: check that the DirectRunner is not used in conjunction with
-   * streaming, and if streaming is specified, use the DataflowRunner.
-   */
-  public void setupRunner() {
-    Class<? extends PipelineRunner<?>> runner = options.getRunner();
-    if (options.isStreaming()
-        && (runner.equals(DataflowRunner.class)
-            || runner.equals(BlockingDataflowRunner.class))) {
-      // In order to cancel the pipelines automatically,
-      // {@literal DataflowRunner} is forced to be used.
-      options.setRunner(DataflowRunner.class);
-    }
-  }
-
-  /**
-   * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
-   * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
-   */
-  public void waitToFinish(PipelineResult result) {
-    if (result instanceof DataflowPipelineJob) {
-      final DataflowPipelineJob job = (DataflowPipelineJob) result;
-      jobsToCancel.add(job);
-      if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) {
-        addShutdownHook(jobsToCancel);
-      }
-      try {
-        job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out));
-      } catch (Exception e) {
-        throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
-      }
-    } else {
-      // Do nothing if the given PipelineResult doesn't support waitToFinish(),
-      // such as EvaluationResults returned by DirectRunner.
-      tearDown();
-      printPendingMessages();
-    }
-  }
-
-  private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
-    if (dataflowClient == null) {
-      dataflowClient = options.getDataflowClient();
-    }
-
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        tearDown();
-        printPendingMessages();
-        for (DataflowPipelineJob job : jobs) {
-          System.out.println("Canceling example pipeline: " + job.getJobId());
-          try {
-            job.cancel();
-          } catch (IOException e) {
-            System.out.println("Failed to cancel the job,"
-                + " please go to the Developers Console to cancel it manually");
-            System.out.println(
-                MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
-          }
-        }
-
-        for (DataflowPipelineJob job : jobs) {
-          boolean cancellationVerified = false;
-          for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
-            if (job.getState().isTerminal()) {
-              cancellationVerified = true;
-              System.out.println("Canceled example pipeline: " + job.getJobId());
-              break;
-            } else {
-              System.out.println(
-                  "The example pipeline is still running. Verifying the cancellation.");
-            }
-            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
-          }
-          if (!cancellationVerified) {
-            System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
-            System.out.println("Please go to the Developers Console to verify manually:");
-            System.out.println(
-                MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
-          }
-        }
-      }
-    });
-  }
-
-  private void printPendingMessages() {
-    System.out.println();
-    System.out.println("***********************************************************");
-    System.out.println("***********************************************************");
-    for (String message : pendingMessages) {
-      System.out.println(message);
-    }
-    System.out.println("***********************************************************");
-    System.out.println("***********************************************************");
-  }
-
-  private static <T> T executeNullIfNotFound(
-      AbstractGoogleClientRequest<T> request) throws IOException {
-    try {
-      return request.execute();
-    } catch (GoogleJsonResponseException e) {
-      if (e.getStatusCode() == SC_NOT_FOUND) {
-        return null;
-      } else {
-        throw e;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
new file mode 100644
index 0000000..bba7b21
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.common;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+
+/**
+ * Options that can be used to configure the Beam examples.
+ */
+public interface ExampleOptions extends DataflowPipelineOptions {
+  @Description("Whether to keep jobs running on the Dataflow service after local process exit")
+  @Default.Boolean(false)
+  boolean getKeepJobsRunning();
+  void setKeepJobsRunning(boolean keepJobsRunning);
+
+  @Description("Number of workers to use when executing the injector pipeline")
+  @Default.Integer(1)
+  int getInjectorNumWorkers();
+  void setInjectorNumWorkers(int numWorkers);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
new file mode 100644
index 0000000..e30b1e4
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.common;
+
+import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.Transport;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Datasets;
+import com.google.api.services.bigquery.Bigquery.Tables;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The utility class that sets up and tears down external resources,
+ * and cancels the streaming pipelines once the program terminates.
+ *
+ * <p>It is used to run Beam examples, such as TrafficMaxLaneFlow and TrafficRoutes.
+ */
+public class ExampleUtils {
+
+  private static final int SC_NOT_FOUND = 404;
+
+  private final DataflowPipelineOptions options;
+  private Bigquery bigQueryClient = null;
+  private Pubsub pubsubClient = null;
+  private Dataflow dataflowClient = null;
+  private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
+  private List<String> pendingMessages = Lists.newArrayList();
+
+  public ExampleUtils(DataflowPipelineOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Do resources and runner options setup.
+   */
+  public ExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
+      throws IOException {
+    this.options = options;
+    setupResourcesAndRunner(isUnbounded);
+  }
+
+  /**
+   * Sets up external resources that are required by the example,
+   * such as Pub/Sub topics and BigQuery tables.
+   *
+   * @throws IOException if there is a problem setting up the resources
+   */
+  public void setup() throws IOException {
+    Sleeper sleeper = Sleeper.DEFAULT;
+    BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200);
+    Throwable lastException = null;
+    try {
+      do {
+        try {
+          setupPubsub();
+          setupBigQueryTable();
+          return;
+        } catch (GoogleJsonResponseException e) {
+          lastException = e;
+        }
+      } while (BackOffUtils.next(sleeper, backOff));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      // Ignore InterruptedException
+    }
+    throw new RuntimeException(lastException);
+  }
+
+  /**
+   * Set up external resources, and configure the runner appropriately.
+   */
+  public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
+    if (isUnbounded) {
+      options.setStreaming(true);
+    }
+    setup();
+    setupRunner();
+  }
+
+  /**
+   * Sets up the Google Cloud Pub/Sub topic.
+   *
+   * <p>If the topic doesn't exist, a new topic with the given name will be created.
+   *
+   * @throws IOException if there is a problem setting up the Pub/Sub topic
+   */
+  public void setupPubsub() throws IOException {
+    ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+        options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+    if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+      pendingMessages.add("**********************Set Up Pubsub************************");
+      setupPubsubTopic(pubsubOptions.getPubsubTopic());
+      pendingMessages.add("The Pub/Sub topic has been set up for this example: "
+          + pubsubOptions.getPubsubTopic());
+
+      if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+        setupPubsubSubscription(
+            pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
+        pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
+            + pubsubOptions.getPubsubSubscription());
+      }
+    }
+  }
+
+  /**
+   * Sets up the BigQuery table with the given schema.
+   *
+   * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
+   * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
+   * will be created.
+   *
+   * @throws IOException if there is a problem setting up the BigQuery table
+   */
+  public void setupBigQueryTable() throws IOException {
+    ExampleBigQueryTableOptions bigQueryTableOptions =
+        options.as(ExampleBigQueryTableOptions.class);
+    if (bigQueryTableOptions.getBigQueryDataset() != null
+        && bigQueryTableOptions.getBigQueryTable() != null
+        && bigQueryTableOptions.getBigQuerySchema() != null) {
+      pendingMessages.add("******************Set Up Big Query Table*******************");
+      setupBigQueryTable(bigQueryTableOptions.getProject(),
+                         bigQueryTableOptions.getBigQueryDataset(),
+                         bigQueryTableOptions.getBigQueryTable(),
+                         bigQueryTableOptions.getBigQuerySchema());
+      pendingMessages.add("The BigQuery table has been set up for this example: "
+          + bigQueryTableOptions.getProject()
+          + ":" + bigQueryTableOptions.getBigQueryDataset()
+          + "." + bigQueryTableOptions.getBigQueryTable());
+    }
+  }
+
+  /**
+   * Tears down external resources that can be deleted upon the example's completion.
+   */
+  private void tearDown() {
+    pendingMessages.add("*************************Tear Down*************************");
+    ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+        options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+    if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+      try {
+        deletePubsubTopic(pubsubOptions.getPubsubTopic());
+        pendingMessages.add("The Pub/Sub topic has been deleted: "
+            + pubsubOptions.getPubsubTopic());
+      } catch (IOException e) {
+        pendingMessages.add("Failed to delete the Pub/Sub topic : "
+            + pubsubOptions.getPubsubTopic());
+      }
+      if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+        try {
+          deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
+          pendingMessages.add("The Pub/Sub subscription has been deleted: "
+              + pubsubOptions.getPubsubSubscription());
+        } catch (IOException e) {
+          pendingMessages.add("Failed to delete the Pub/Sub subscription : "
+              + pubsubOptions.getPubsubSubscription());
+        }
+      }
+    }
+
+    ExampleBigQueryTableOptions bigQueryTableOptions =
+        options.as(ExampleBigQueryTableOptions.class);
+    if (bigQueryTableOptions.getBigQueryDataset() != null
+        && bigQueryTableOptions.getBigQueryTable() != null
+        && bigQueryTableOptions.getBigQuerySchema() != null) {
+      pendingMessages.add("The BigQuery table might contain the example's output, "
+          + "and it is not deleted automatically: "
+          + bigQueryTableOptions.getProject()
+          + ":" + bigQueryTableOptions.getBigQueryDataset()
+          + "." + bigQueryTableOptions.getBigQueryTable());
+      pendingMessages.add("Please go to the Developers Console to delete it manually."
+          + " Otherwise, you may be charged for its usage.");
+    }
+  }
+
+  private void setupBigQueryTable(String projectId, String datasetId, String tableId,
+      TableSchema schema) throws IOException {
+    if (bigQueryClient == null) {
+      bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
+    }
+
+    Datasets datasetService = bigQueryClient.datasets();
+    if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
+      Dataset newDataset = new Dataset().setDatasetReference(
+          new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
+      datasetService.insert(projectId, newDataset).execute();
+    }
+
+    Tables tableService = bigQueryClient.tables();
+    Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
+    if (table == null) {
+      Table newTable = new Table().setSchema(schema).setTableReference(
+          new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
+      tableService.insert(projectId, datasetId, newTable).execute();
+    } else if (!table.getSchema().equals(schema)) {
+      throw new RuntimeException(
+          "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
+          + ", actual: " + table.getSchema().toPrettyString());
+    }
+  }
+
+  private void setupPubsubTopic(String topic) throws IOException {
+    if (pubsubClient == null) {
+      pubsubClient = Transport.newPubsubClient(options).build();
+    }
+    if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
+      pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
+    }
+  }
+
+  private void setupPubsubSubscription(String topic, String subscription) throws IOException {
+    if (pubsubClient == null) {
+      pubsubClient = Transport.newPubsubClient(options).build();
+    }
+    if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
+      Subscription subInfo = new Subscription()
+        .setAckDeadlineSeconds(60)
+        .setTopic(topic);
+      pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
+    }
+  }
+
+  /**
+   * Deletes the Google Cloud Pub/Sub topic.
+   *
+   * @throws IOException if there is a problem deleting the Pub/Sub topic
+   */
+  private void deletePubsubTopic(String topic) throws IOException {
+    if (pubsubClient == null) {
+      pubsubClient = Transport.newPubsubClient(options).build();
+    }
+    if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
+      pubsubClient.projects().topics().delete(topic).execute();
+    }
+  }
+
+  /**
+   * Deletes the Google Cloud Pub/Sub subscription.
+   *
+   * @throws IOException if there is a problem deleting the Pub/Sub subscription
+   */
+  private void deletePubsubSubscription(String subscription) throws IOException {
+    if (pubsubClient == null) {
+      pubsubClient = Transport.newPubsubClient(options).build();
+    }
+    if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
+      pubsubClient.projects().subscriptions().delete(subscription).execute();
+    }
+  }
+
+  /**
+   * Do some runner setup: check that the DirectRunner is not used in conjunction with
+   * streaming, and if streaming is specified, use the DataflowRunner.
+   */
+  public void setupRunner() {
+    Class<? extends PipelineRunner<?>> runner = options.getRunner();
+    if (options.isStreaming()
+        && (runner.equals(DataflowRunner.class)
+            || runner.equals(BlockingDataflowRunner.class))) {
+      // In order to cancel the pipelines automatically,
+      // {@literal DataflowRunner} is forced to be used.
+      options.setRunner(DataflowRunner.class);
+    }
+  }
+
+  /**
+   * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
+   * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
+   */
+  public void waitToFinish(PipelineResult result) {
+    if (result instanceof DataflowPipelineJob) {
+      final DataflowPipelineJob job = (DataflowPipelineJob) result;
+      jobsToCancel.add(job);
+      if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
+        addShutdownHook(jobsToCancel);
+      }
+      try {
+        job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out));
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
+      }
+    } else {
+      // Do nothing if the given PipelineResult doesn't support waitToFinish(),
+      // such as EvaluationResults returned by DirectRunner.
+      tearDown();
+      printPendingMessages();
+    }
+  }
+
+  private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
+    if (dataflowClient == null) {
+      dataflowClient = options.getDataflowClient();
+    }
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        tearDown();
+        printPendingMessages();
+        for (DataflowPipelineJob job : jobs) {
+          System.out.println("Canceling example pipeline: " + job.getJobId());
+          try {
+            job.cancel();
+          } catch (IOException e) {
+            System.out.println("Failed to cancel the job,"
+                + " please go to the Developers Console to cancel it manually");
+            System.out.println(
+                MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+          }
+        }
+
+        for (DataflowPipelineJob job : jobs) {
+          boolean cancellationVerified = false;
+          for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
+            if (job.getState().isTerminal()) {
+              cancellationVerified = true;
+              System.out.println("Canceled example pipeline: " + job.getJobId());
+              break;
+            } else {
+              System.out.println(
+                  "The example pipeline is still running. Verifying the cancellation.");
+            }
+            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+          }
+          if (!cancellationVerified) {
+            System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
+            System.out.println("Please go to the Developers Console to verify manually:");
+            System.out.println(
+                MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+          }
+        }
+      }
+    });
+  }
+
+  private void printPendingMessages() {
+    System.out.println();
+    System.out.println("***********************************************************");
+    System.out.println("***********************************************************");
+    for (String message : pendingMessages) {
+      System.out.println(message);
+    }
+    System.out.println("***********************************************************");
+    System.out.println("***********************************************************");
+  }
+
+  private static <T> T executeNullIfNotFound(
+      AbstractGoogleClientRequest<T> request) throws IOException {
+    try {
+      return request.execute();
+    } catch (GoogleJsonResponseException e) {
+      if (e.getStatusCode() == SC_NOT_FOUND) {
+        return null;
+      } else {
+        throw e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 98c4994..f8cd0f1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -20,8 +20,8 @@ package org.apache.beam.examples.complete;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 
-import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AvroCoder;
@@ -451,7 +451,7 @@ public class AutoComplete {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
 
     options.setBigQuerySchema(FormatForBigquery.getSchema());
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    ExampleUtils dataflowUtils = new ExampleUtils(options);
 
     // We support running the same pipeline in either
     // batch or windowed streaming mode.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index 4ea199c..046428c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.examples.complete;
 
-import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BigQueryIO;
@@ -120,7 +120,7 @@ public class StreamingWordExtract {
     options.setStreaming(true);
 
     options.setBigQuerySchema(StringToRowConverter.getSchema());
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    ExampleUtils dataflowUtils = new ExampleUtils(options);
     dataflowUtils.setup();
 
     Pipeline pipeline = Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 2db7c9e..1bbc68b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.examples.complete;
 
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AvroCoder;
@@ -307,8 +307,7 @@ public class TrafficMaxLaneFlow {
     *
     * <p>Inherits standard configuration options.
     */
-  private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions,
-      ExampleBigQueryTableOptions {
+  private interface TrafficMaxLaneFlowOptions extends ExampleOptions, ExampleBigQueryTableOptions {
     @Description("Path of the file to read from")
     @Default.String("gs://dataflow-samples/traffic_sensor/"
         + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
@@ -342,7 +341,7 @@ public class TrafficMaxLaneFlow {
         .as(TrafficMaxLaneFlowOptions.class);
     options.setBigQuerySchema(FormatMaxesFn.getSchema());
     // Using DataflowExampleUtils to set up required resources.
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
+    ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded());
 
     Pipeline pipeline = Pipeline.create(options);
     TableReference tableRef = new TableReference();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 89cfbfc..8af0922 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.examples.complete;
 
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AvroCoder;
@@ -317,8 +317,7 @@ public class TrafficRoutes {
   *
   * <p>Inherits standard configuration options.
   */
-  private interface TrafficRoutesOptions extends DataflowExampleOptions,
-      ExampleBigQueryTableOptions {
+  private interface TrafficRoutesOptions extends ExampleOptions, ExampleBigQueryTableOptions {
     @Description("Path of the file to read from")
     @Default.String("gs://dataflow-samples/traffic_sensor/"
         + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
@@ -353,7 +352,7 @@ public class TrafficRoutes {
 
     options.setBigQuerySchema(FormatStatsFn.getSchema());
     // Using DataflowExampleUtils to set up required resources.
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
+    ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded());
 
     Pipeline pipeline = Pipeline.create(options);
     TableReference tableRef = new TableReference();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 5e60835..aa91ac6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.examples.cookbook;
 
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BigQueryIO;
@@ -419,8 +419,7 @@ public class TriggerExample {
   /**
    * Inherits standard configuration options.
    */
-  public interface TrafficFlowOptions
-      extends ExampleBigQueryTableOptions, DataflowExampleOptions {
+  public interface TrafficFlowOptions extends ExampleBigQueryTableOptions, ExampleOptions {
 
     @Description("Input file to read from")
     @Default.String("gs://dataflow-samples/traffic_sensor/"
@@ -444,7 +443,7 @@ public class TriggerExample {
 
     options.setBigQuerySchema(getSchema());
 
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    ExampleUtils dataflowUtils = new ExampleUtils(options);
     dataflowUtils.setup();
 
     Pipeline pipeline = Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index b1cb312..5b27f83 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.examples.complete.game;
 
-import org.apache.beam.examples.common.DataflowExampleUtils;
+import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -242,7 +242,7 @@ public class GameStats extends LeaderBoard {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     // Enforce that this pipeline is always run in streaming mode.
     options.setStreaming(true);
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    ExampleUtils dataflowUtils = new ExampleUtils(options);
     Pipeline pipeline = Pipeline.create(options);
 
     // Read Events from Pub/Sub using custom timestamps

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index a14d533..051b4de 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.examples.complete.game;
 
-import org.apache.beam.examples.common.DataflowExampleOptions;
-import org.apache.beam.examples.common.DataflowExampleUtils;
+import org.apache.beam.examples.common.ExampleOptions;
+import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
@@ -102,7 +102,7 @@ public class LeaderBoard extends HourlyTeamScore {
   /**
    * Options supported by {@link LeaderBoard}.
    */
-  static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions {
+  static interface Options extends HourlyTeamScore.Options, ExampleOptions {
 
     @Description("Pub/Sub topic to read from")
     @Validation.Required
@@ -178,7 +178,7 @@ public class LeaderBoard extends HourlyTeamScore {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     // Enforce that this pipeline is always run in streaming mode.
     options.setStreaming(true);
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    ExampleUtils dataflowUtils = new ExampleUtils(options);
     Pipeline pipeline = Pipeline.create(options);
 
     // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub


[2/2] incubator-beam git commit: Closes #605

Posted by dh...@apache.org.
Closes #605


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/921c55c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/921c55c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/921c55c9

Branch: refs/heads/master
Commit: 921c55c94f72210d010359e628f2dcfe866f84d5
Parents: 290c0b7 a6f488f
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jul 7 22:19:04 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 7 22:19:04 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/WindowedWordCount.java |  12 +-
 .../examples/common/DataflowExampleOptions.java |  37 --
 .../examples/common/DataflowExampleUtils.java   | 404 -------------------
 .../beam/examples/common/ExampleOptions.java    |  37 ++
 .../beam/examples/common/ExampleUtils.java      | 404 +++++++++++++++++++
 .../beam/examples/complete/AutoComplete.java    |   4 +-
 .../examples/complete/StreamingWordExtract.java |   4 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |   9 +-
 .../beam/examples/complete/TrafficRoutes.java   |   9 +-
 .../beam/examples/cookbook/TriggerExample.java  |   9 +-
 .../beam/examples/complete/game/GameStats.java  |   4 +-
 .../examples/complete/game/LeaderBoard.java     |   8 +-
 12 files changed, 467 insertions(+), 474 deletions(-)
----------------------------------------------------------------------