You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/18 19:09:42 UTC

[2/2] incubator-beam git commit: [BEAM-433] Remove references to JobName in examples pipeline options.

[BEAM-433] Remove references to JobName in examples pipeline options.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7313fb56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7313fb56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7313fb56

Branch: refs/heads/master
Commit: 7313fb56a1311c918da6f07cc01021b196212a47
Parents: f776b11
Author: Pei He <pe...@google.com>
Authored: Mon Jul 11 14:05:46 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 18 12:09:32 2016 -0700

----------------------------------------------------------------------
 .../common/ExampleBigQueryTableOptions.java     | 10 ++--
 .../beam/examples/common/ExampleOptions.java    | 50 +++++++++++++++++++-
 ...xamplePubsubTopicAndSubscriptionOptions.java | 10 ++--
 .../common/ExamplePubsubTopicOptions.java       | 12 ++---
 .../beam/examples/complete/AutoComplete.java    |  7 +--
 .../examples/complete/StreamingWordExtract.java |  5 +-
 .../beam/examples/cookbook/TriggerExample.java  |  4 +-
 .../beam/examples/complete/game/GameStats.java  |  4 +-
 .../examples/complete/game/LeaderBoard.java     |  7 +--
 9 files changed, 79 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
index 36304a8..54cc99e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
@@ -17,21 +17,21 @@
  */
 package org.apache.beam.examples.common;
 
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 import com.google.api.services.bigquery.model.TableSchema;
 
 /**
- * Options that can be used to configure BigQuery tables in Dataflow examples.
+ * Options that can be used to configure BigQuery tables in Beam examples.
  * The project defaults to the project being used to run the example.
  */
-public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
+public interface ExampleBigQueryTableOptions extends GcpOptions {
   @Description("BigQuery dataset name")
-  @Default.String("dataflow_examples")
+  @Default.String("beam_examples")
   String getBigQueryDataset();
   void setBigQueryDataset(String dataset);
 
@@ -50,7 +50,7 @@ public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
   static class BigQueryTableFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
-      return options.as(DataflowPipelineOptions.class).getJobName()
+      return options.as(ExampleOptions.class).getNormalizedUniqueName()
           .replace('-', '_');
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
index bba7b21..43afeb4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
@@ -17,14 +17,25 @@
  */
 package org.apache.beam.examples.common;
 
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.google.common.base.MoreObjects;
+
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Options that can be used to configure the Beam examples.
  */
-public interface ExampleOptions extends DataflowPipelineOptions {
+public interface ExampleOptions extends PipelineOptions {
   @Description("Whether to keep jobs running on the Dataflow service after local process exit")
   @Default.Boolean(false)
   boolean getKeepJobsRunning();
@@ -34,4 +45,39 @@ public interface ExampleOptions extends DataflowPipelineOptions {
   @Default.Integer(1)
   int getInjectorNumWorkers();
   void setInjectorNumWorkers(int numWorkers);
+
+  @Description("A normalized unique name that is used to name anything related to the pipeline."
+      + "It defaults to ApplicationName-UserName-Date-RandomInteger")
+  @Default.InstanceFactory(NormalizedUniqueNameFactory.class)
+  String getNormalizedUniqueName();
+  void setNormalizedUniqueName(String numWorkers);
+
+  /**
+   * Returns a normalized unique name constructed from {@link ApplicationNameOptions#getAppName()},
+   * the local system user name (if available), the current time, and a random integer.
+   *
+   * <p>The normalization makes sure that the name matches the pattern of
+   * [a-z]([-a-z0-9]*[a-z0-9])?.
+   */
+  public static class NormalizedUniqueNameFactory implements DefaultValueFactory<String> {
+    private static final DateTimeFormatter FORMATTER =
+        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+    @Override
+    public String create(PipelineOptions options) {
+      String appName = options.as(ApplicationNameOptions.class).getAppName();
+      String normalizedAppName = appName == null || appName.length() == 0 ? "BeamApp"
+          : appName.toLowerCase()
+                   .replaceAll("[^a-z0-9]", "0")
+                   .replaceAll("^[^a-z]", "a");
+      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
+      String normalizedUserName = userName.toLowerCase()
+                                          .replaceAll("[^a-z0-9]", "0");
+      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+
+      String randomPart = Integer.toHexString(ThreadLocalRandom.current().nextInt());
+      return String.format("%s-%s-%s-%s",
+          normalizedAppName, normalizedUserName, datePart, randomPart);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
index 22bcf4e..36893a3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -17,14 +17,14 @@
  */
 package org.apache.beam.examples.common;
 
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
- * Options that can be used to configure Pub/Sub topic/subscription in Dataflow examples.
+ * Options that can be used to configure Pub/Sub topic/subscription in Beam examples.
  */
 public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions {
   @Description("Pub/Sub subscription")
@@ -38,10 +38,8 @@ public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubT
   static class PubsubSubscriptionFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
-      DataflowPipelineOptions dataflowPipelineOptions =
-          options.as(DataflowPipelineOptions.class);
-      return "projects/" + dataflowPipelineOptions.getProject()
-          + "/subscriptions/" + dataflowPipelineOptions.getJobName();
+      return "projects/" + options.as(GcpOptions.class).getProject()
+          + "/subscriptions/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
index 603e309..1c9270b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
@@ -17,16 +17,16 @@
  */
 package org.apache.beam.examples.common;
 
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
- * Options that can be used to configure Pub/Sub topic in Dataflow examples.
+ * Options that can be used to configure Pub/Sub topic in Beam examples.
  */
-public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions {
+public interface ExamplePubsubTopicOptions extends GcpOptions {
   @Description("Pub/Sub topic")
   @Default.InstanceFactory(PubsubTopicFactory.class)
   String getPubsubTopic();
@@ -38,10 +38,8 @@ public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions {
   static class PubsubTopicFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
-      DataflowPipelineOptions dataflowPipelineOptions =
-          options.as(DataflowPipelineOptions.class);
-      return "projects/" + dataflowPipelineOptions.getProject()
-          + "/topics/" + dataflowPipelineOptions.getJobName();
+      return "projects/" + options.as(GcpOptions.class).getProject()
+          + "/topics/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index e6cc0cc..26f6045 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
 import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -32,6 +33,7 @@ import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -48,7 +50,6 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
@@ -416,7 +417,8 @@ public class AutoComplete {
    *
    * <p>Inherits standard Dataflow configuration options.
    */
-  private static interface Options extends ExampleBigQueryTableOptions {
+  private static interface Options
+      extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
     @Description("Input text file")
     @Validation.Required
     String getInputFile();
@@ -455,7 +457,6 @@ public class AutoComplete {
 
     // We support running the same pipeline in either
     // batch or windowed streaming mode.
-    PTransform<? super PBegin, PCollection<String>> readSource;
     WindowFn<Object, ?> windowFn;
     if (options.isStreaming()) {
       checkArgument(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index df2e36e..d4ba8bd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -18,6 +18,7 @@
 package org.apache.beam.examples.complete;
 
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
 import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -26,6 +27,7 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 
@@ -101,7 +103,8 @@ public class StreamingWordExtract {
    *
    * <p>Inherits standard configuration options.
    */
-  private interface StreamingWordExtractOptions extends ExampleBigQueryTableOptions {
+  private interface StreamingWordExtractOptions
+      extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
     @Description("Path of the file to read from")
     @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
     String getInputFile();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 922cf23..bbff6f9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -419,7 +420,8 @@ public class TriggerExample {
   /**
    * Inherits standard configuration options.
    */
-  public interface TrafficFlowOptions extends ExampleBigQueryTableOptions, ExampleOptions {
+  public interface TrafficFlowOptions
+      extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
 
     @Description("Input file to read from")
     @Default.String("gs://dataflow-samples/traffic_sensor/"

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 5b27f83..33b8727 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -242,7 +242,7 @@ public class GameStats extends LeaderBoard {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     // Enforce that this pipeline is always run in streaming mode.
     options.setStreaming(true);
-    ExampleUtils dataflowUtils = new ExampleUtils(options);
+    ExampleUtils exampleUtils = new ExampleUtils(options);
     Pipeline pipeline = Pipeline.create(options);
 
     // Read Events from Pub/Sub using custom timestamps
@@ -330,6 +330,6 @@ public class GameStats extends LeaderBoard {
     // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
     // command line.
     PipelineResult result = pipeline.run();
-    dataflowUtils.waitToFinish(result);
+    exampleUtils.waitToFinish(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 051b4de..bd22305 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
@@ -102,7 +103,7 @@ public class LeaderBoard extends HourlyTeamScore {
   /**
    * Options supported by {@link LeaderBoard}.
    */
-  static interface Options extends HourlyTeamScore.Options, ExampleOptions {
+  static interface Options extends HourlyTeamScore.Options, ExampleOptions, StreamingOptions {
 
     @Description("Pub/Sub topic to read from")
     @Validation.Required
@@ -178,7 +179,7 @@ public class LeaderBoard extends HourlyTeamScore {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     // Enforce that this pipeline is always run in streaming mode.
     options.setStreaming(true);
-    ExampleUtils dataflowUtils = new ExampleUtils(options);
+    ExampleUtils exampleUtils = new ExampleUtils(options);
     Pipeline pipeline = Pipeline.create(options);
 
     // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
@@ -231,6 +232,6 @@ public class LeaderBoard extends HourlyTeamScore {
     // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
     // command line.
     PipelineResult result = pipeline.run();
-    dataflowUtils.waitToFinish(result);
+    exampleUtils.waitToFinish(result);
   }
 }