You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/25 20:44:19 UTC

[1/2] incubator-beam git commit: Closes #719

Repository: incubator-beam
Updated Branches:
  refs/heads/master 10b3fee34 -> a69a0ea90


Closes #719


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a69a0ea9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a69a0ea9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a69a0ea9

Branch: refs/heads/master
Commit: a69a0ea90a9dfeb295b9cef0b2f22b3ec91aa2ba
Parents: 10b3fee b04776d
Author: Dan Halperin <dh...@google.com>
Authored: Thu Aug 25 13:44:15 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Aug 25 13:44:15 2016 -0700

----------------------------------------------------------------------
 .../common/ExampleBigQueryTableOptions.java     |  3 +-
 .../beam/examples/common/ExampleOptions.java    | 43 ------------------
 ...xamplePubsubTopicAndSubscriptionOptions.java |  2 +-
 .../common/ExamplePubsubTopicOptions.java       |  2 +-
 .../runners/flink/FlinkPipelineOptions.java     | 34 --------------
 .../options/DataflowPipelineOptions.java        | 48 --------------------
 .../options/DataflowPipelineOptionsTest.java    | 42 ++++++++++++++---
 .../beam/sdk/options/PipelineOptions.java       | 42 +++++++++++++++++
 8 files changed, 80 insertions(+), 136 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: [BEAM-545] Promote JobName to PipelineOptions

Posted by dh...@apache.org.
[BEAM-545] Promote JobName to PipelineOptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b04776d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b04776d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b04776d4

Branch: refs/heads/master
Commit: b04776d4d5235340148233f5f9e3f26f4c01076b
Parents: 10b3fee
Author: Pei He <pe...@google.com>
Authored: Wed Aug 10 15:21:41 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Aug 25 13:44:15 2016 -0700

----------------------------------------------------------------------
 .../common/ExampleBigQueryTableOptions.java     |  3 +-
 .../beam/examples/common/ExampleOptions.java    | 43 ------------------
 ...xamplePubsubTopicAndSubscriptionOptions.java |  2 +-
 .../common/ExamplePubsubTopicOptions.java       |  2 +-
 .../runners/flink/FlinkPipelineOptions.java     | 34 --------------
 .../options/DataflowPipelineOptions.java        | 48 --------------------
 .../options/DataflowPipelineOptionsTest.java    | 42 ++++++++++++++---
 .../beam/sdk/options/PipelineOptions.java       | 42 +++++++++++++++++
 8 files changed, 80 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
index 2eef525..5d815c7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
@@ -49,8 +49,7 @@ public interface ExampleBigQueryTableOptions extends GcpOptions {
   static class BigQueryTableFactory implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
-      return options.as(ExampleOptions.class).getNormalizedUniqueName()
-          .replace('-', '_');
+      return options.getJobName().replace('-', '_');
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
index 8b7ed07..85643e4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
@@ -17,17 +17,9 @@
  */
 package org.apache.beam.examples.common;
 
-import com.google.common.base.MoreObjects;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 
 /**
  * Options that can be used to configure the Beam examples.
@@ -42,39 +34,4 @@ public interface ExampleOptions extends PipelineOptions {
   @Default.Integer(1)
   int getInjectorNumWorkers();
   void setInjectorNumWorkers(int numWorkers);
-
-  @Description("A normalized unique name that is used to name anything related to the pipeline."
-      + "It defaults to ApplicationName-UserName-Date-RandomInteger")
-  @Default.InstanceFactory(NormalizedUniqueNameFactory.class)
-  String getNormalizedUniqueName();
-  void setNormalizedUniqueName(String numWorkers);
-
-  /**
-   * Returns a normalized unique name constructed from {@link ApplicationNameOptions#getAppName()},
-   * the local system user name (if available), the current time, and a random integer.
-   *
-   * <p>The normalization makes sure that the name matches the pattern of
-   * [a-z]([-a-z0-9]*[a-z0-9])?.
-   */
-  public static class NormalizedUniqueNameFactory implements DefaultValueFactory<String> {
-    private static final DateTimeFormatter FORMATTER =
-        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
-    @Override
-    public String create(PipelineOptions options) {
-      String appName = options.as(ApplicationNameOptions.class).getAppName();
-      String normalizedAppName = appName == null || appName.length() == 0 ? "BeamApp"
-          : appName.toLowerCase()
-                   .replaceAll("[^a-z0-9]", "0")
-                   .replaceAll("^[^a-z]", "a");
-      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
-      String normalizedUserName = userName.toLowerCase()
-                                          .replaceAll("[^a-z0-9]", "0");
-      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
-
-      String randomPart = Integer.toHexString(ThreadLocalRandom.current().nextInt());
-      return String.format("%s-%s-%s-%s",
-          normalizedAppName, normalizedUserName, datePart, randomPart);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
index 36893a3..7f954a1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -39,7 +39,7 @@ public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubT
     @Override
     public String create(PipelineOptions options) {
       return "projects/" + options.as(GcpOptions.class).getProject()
-          + "/subscriptions/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
+          + "/subscriptions/" + options.getJobName();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
index 1c9270b..71879b7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
@@ -39,7 +39,7 @@ public interface ExamplePubsubTopicOptions extends GcpOptions {
     @Override
     public String create(PipelineOptions options) {
       return "projects/" + options.as(GcpOptions.class).getProject()
-          + "/topics/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
+          + "/topics/" + options.getJobName();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 6d1a8d0..6561fa5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -22,14 +22,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.List;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 
 /**
  * Options which can be used to configure a Flink PipelineRunner.
@@ -50,15 +45,6 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
   void setFilesToStage(List<String> value);
 
   /**
-   * The job name is used to identify jobs running on a Flink cluster.
-   */
-  @Description("Flink job name, to uniquely identify active jobs. "
-      + "Defaults to using the ApplicationName-UserName-Date.")
-  @Default.InstanceFactory(JobNameFactory.class)
-  String getJobName();
-  void setJobName(String value);
-
-  /**
    * The url of the Flink JobManager on which to execute pipelines. This can either be
    * the the address of a cluster JobManager, in the form "host:port" or one of the special
    * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
@@ -93,24 +79,4 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
   @Default.Long(-1L)
   Long getExecutionRetryDelay();
   void setExecutionRetryDelay(Long delay);
-
-
-  class JobNameFactory implements DefaultValueFactory<String> {
-    private static final DateTimeFormatter FORMATTER =
-        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
-    @Override
-    public String create(PipelineOptions options) {
-      String appName = options.as(ApplicationNameOptions.class).getAppName();
-      String normalizedAppName = appName == null || appName.length() == 0 ? "FlinkRunner"
-          : appName.toLowerCase()
-          .replaceAll("[^a-z0-9]", "0")
-          .replaceAll("^[^a-z]", "a");
-      String userName = System.getProperty("user.name", "");
-      String normalizedUserName = userName.toLowerCase()
-          .replaceAll("[^a-z0-9]", "0");
-      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
-      return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 8ef43c5..9f58f93 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.options;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Strings.isNullOrEmpty;
 
-import com.google.common.base.MoreObjects;
 import java.io.IOException;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
@@ -35,10 +34,6 @@ import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.util.IOChannelUtils;
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 
 /**
  * Options that can be used to configure the {@link DataflowRunner}.
@@ -76,21 +71,6 @@ public interface DataflowPipelineOptions
   void setStagingLocation(String value);
 
   /**
-   * The Dataflow job name is used as an idempotence key within the Dataflow service.
-   * If there is an existing job that is currently active, another active job with the same
-   * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
-   */
-  @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
-      + "For each running job in the same GCP project, jobs with the same name cannot be created "
-      + "unless the new job is an explicit update of the previous one. Defaults to using "
-      + "ApplicationName-UserName-Date. The job name must match the regular expression "
-      + "'[a-z]([-a-z0-9]{0,38}[a-z0-9])?'. The runner will automatically truncate the name of the "
-      + "job and convert to lower case.")
-  @Default.InstanceFactory(JobNameFactory.class)
-  String getJobName();
-  void setJobName(String value);
-
-  /**
    * Whether to update the currently running pipeline with the same name as this one.
    */
   @Description(
@@ -100,34 +80,6 @@ public interface DataflowPipelineOptions
   void setUpdate(boolean value);
 
   /**
-   * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
-   * local system user name (if available), and the current time. The normalization makes sure that
-   * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
-   * characters.
-   *
-   * <p>This job name factory is only able to generate one unique name per second per application
-   * and user combination.
-   */
-  public static class JobNameFactory implements DefaultValueFactory<String> {
-    private static final DateTimeFormatter FORMATTER =
-        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
-    @Override
-    public String create(PipelineOptions options) {
-      String appName = options.as(ApplicationNameOptions.class).getAppName();
-      String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
-          : appName.toLowerCase()
-                   .replaceAll("[^a-z0-9]", "0")
-                   .replaceAll("^[^a-z]", "a");
-      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
-      String normalizedUserName = userName.toLowerCase()
-                                          .replaceAll("[^a-z0-9]", "0");
-      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
-      return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
-    }
-  }
-
-  /**
    * Returns a default staging location under {@link GcpOptions#getGcpTempLocation}.
    */
   public static class StagingLocationFactory implements DefaultValueFactory<String> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 61d3992..202d04b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -53,7 +53,13 @@ public class DataflowPipelineOptionsTest {
     System.getProperties().remove("user.name");
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setAppName("TestApplication");
-    assertEquals("testapplication--1208190706", options.getJobName());
+    String[] nameComponents = options.getJobName().split("-");
+    assertEquals(4, nameComponents.length);
+    assertEquals("testapplication", nameComponents[0]);
+    assertEquals("", nameComponents[1]);
+    assertEquals("1208190706", nameComponents[2]);
+    // Verify the last component is a hex integer (unsigned).
+    Long.parseLong(nameComponents[3], 16);
     assertTrue(options.getJobName().length() <= 40);
   }
 
@@ -63,9 +69,13 @@ public class DataflowPipelineOptionsTest {
     System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setAppName("1234567890123456789012345678901234567890");
-    assertEquals(
-        "a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706",
-        options.getJobName());
+    String[] nameComponents = options.getJobName().split("-");
+    assertEquals(4, nameComponents.length);
+    assertEquals("a234567890123456789012345678901234567890", nameComponents[0]);
+    assertEquals("abcdeabcdeabcdeabcdeabcdeabcde", nameComponents[1]);
+    assertEquals("1208190706", nameComponents[2]);
+    // Verify the last component is a hex integer (unsigned).
+    Long.parseLong(nameComponents[3], 16);
   }
 
   @Test
@@ -74,7 +84,13 @@ public class DataflowPipelineOptionsTest {
     System.getProperties().put("user.name", "abcde");
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setAppName("1234567890123456789012345678901234567890");
-    assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", options.getJobName());
+    String[] nameComponents = options.getJobName().split("-");
+    assertEquals(4, nameComponents.length);
+    assertEquals("a234567890123456789012345678901234567890", nameComponents[0]);
+    assertEquals("abcde", nameComponents[1]);
+    assertEquals("1208190706", nameComponents[2]);
+    // Verify the last component is a hex integer (unsigned).
+    Long.parseLong(nameComponents[3], 16);
   }
 
   @Test
@@ -83,7 +99,13 @@ public class DataflowPipelineOptionsTest {
     System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setAppName("1234567890");
-    assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", options.getJobName());
+    String[] nameComponents = options.getJobName().split("-");
+    assertEquals(4, nameComponents.length);
+    assertEquals("a234567890", nameComponents[0]);
+    assertEquals("abcdeabcdeabcdeabcdeabcdeabcde", nameComponents[1]);
+    assertEquals("1208190706", nameComponents[2]);
+    // Verify the last component is a hex integer (unsigned).
+    Long.parseLong(nameComponents[3], 16);
   }
 
   @Test
@@ -92,7 +114,13 @@ public class DataflowPipelineOptionsTest {
     System.getProperties().put("user.name", "�i \u0131nt\u0259\u02c8n�\u0283\u0259n\u0259l ");
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setAppName("f\u0259\u02c8n\u025bt\u0131k \u0259so\u028asi\u02c8e\u0131\u0283n");
-    assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName());
+    String[] nameComponents = options.getJobName().split("-");
+    assertEquals(4, nameComponents.length);
+    assertEquals("f00n0t0k00so0si0e00n", nameComponents[0]);
+    assertEquals("0i00nt00n000n0l0", nameComponents[1]);
+    assertEquals("1208190706", nameComponents[2]);
+    // Verify the last component is a hex integer (unsigned).
+    Long.parseLong(nameComponents[3], 16);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index db54d0a..701ae70 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -22,8 +22,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import com.google.auto.service.AutoService;
+import com.google.common.base.MoreObjects;
 import java.lang.reflect.Proxy;
 import java.util.ServiceLoader;
+import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
@@ -33,6 +35,10 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 /**
  * PipelineOptions are used to configure Pipelines. You can extend {@link PipelineOptions}
@@ -258,6 +264,13 @@ public interface PipelineOptions extends HasDisplayData {
   String getTempLocation();
   void setTempLocation(String value);
 
+  @Description("Name of the pipeline execution."
+      + "It must match the regular expression '[a-z]([-a-z0-9]{0,38}[a-z0-9])?'."
+      + "It defaults to ApplicationName-UserName-Date-RandomInteger")
+  @Default.InstanceFactory(JobNameFactory.class)
+  String getJobName();
+  void setJobName(String numWorkers);
+
   /**
    * A {@link DefaultValueFactory} that obtains the class of the {@code DirectRunner} if it exists
    * on the classpath, and throws an exception otherwise.
@@ -284,4 +297,33 @@ public interface PipelineOptions extends HasDisplayData {
       }
     }
   }
+
+  /**
+   * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()},
+   * the local system user name (if available), the current time, and a random integer.
+   *
+   * <p>The normalization makes sure that the name matches the pattern of
+   * [a-z]([-a-z0-9]*[a-z0-9])?.
+   */
+  static class JobNameFactory implements DefaultValueFactory<String> {
+    private static final DateTimeFormatter FORMATTER =
+        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+    @Override
+    public String create(PipelineOptions options) {
+      String appName = options.as(ApplicationNameOptions.class).getAppName();
+      String normalizedAppName = appName == null || appName.length() == 0 ? "BeamApp"
+          : appName.toLowerCase()
+                   .replaceAll("[^a-z0-9]", "0")
+                   .replaceAll("^[^a-z]", "a");
+      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
+      String normalizedUserName = userName.toLowerCase()
+                                          .replaceAll("[^a-z0-9]", "0");
+      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+
+      String randomPart = Integer.toHexString(ThreadLocalRandom.current().nextInt());
+      return String.format("%s-%s-%s-%s",
+          normalizedAppName, normalizedUserName, datePart, randomPart);
+    }
+  }
 }