You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/25 20:44:19 UTC
[1/2] incubator-beam git commit: Closes #719
Repository: incubator-beam
Updated Branches:
refs/heads/master 10b3fee34 -> a69a0ea90
Closes #719
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a69a0ea9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a69a0ea9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a69a0ea9
Branch: refs/heads/master
Commit: a69a0ea90a9dfeb295b9cef0b2f22b3ec91aa2ba
Parents: 10b3fee b04776d
Author: Dan Halperin <dh...@google.com>
Authored: Thu Aug 25 13:44:15 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Aug 25 13:44:15 2016 -0700
----------------------------------------------------------------------
.../common/ExampleBigQueryTableOptions.java | 3 +-
.../beam/examples/common/ExampleOptions.java | 43 ------------------
...xamplePubsubTopicAndSubscriptionOptions.java | 2 +-
.../common/ExamplePubsubTopicOptions.java | 2 +-
.../runners/flink/FlinkPipelineOptions.java | 34 --------------
.../options/DataflowPipelineOptions.java | 48 --------------------
.../options/DataflowPipelineOptionsTest.java | 42 ++++++++++++++---
.../beam/sdk/options/PipelineOptions.java | 42 +++++++++++++++++
8 files changed, 80 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: [BEAM-545] Promote JobName to
PipelineOptions
Posted by dh...@apache.org.
[BEAM-545] Promote JobName to PipelineOptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b04776d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b04776d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b04776d4
Branch: refs/heads/master
Commit: b04776d4d5235340148233f5f9e3f26f4c01076b
Parents: 10b3fee
Author: Pei He <pe...@google.com>
Authored: Wed Aug 10 15:21:41 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Aug 25 13:44:15 2016 -0700
----------------------------------------------------------------------
.../common/ExampleBigQueryTableOptions.java | 3 +-
.../beam/examples/common/ExampleOptions.java | 43 ------------------
...xamplePubsubTopicAndSubscriptionOptions.java | 2 +-
.../common/ExamplePubsubTopicOptions.java | 2 +-
.../runners/flink/FlinkPipelineOptions.java | 34 --------------
.../options/DataflowPipelineOptions.java | 48 --------------------
.../options/DataflowPipelineOptionsTest.java | 42 ++++++++++++++---
.../beam/sdk/options/PipelineOptions.java | 42 +++++++++++++++++
8 files changed, 80 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
index 2eef525..5d815c7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
@@ -49,8 +49,7 @@ public interface ExampleBigQueryTableOptions extends GcpOptions {
static class BigQueryTableFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
- return options.as(ExampleOptions.class).getNormalizedUniqueName()
- .replace('-', '_');
+ return options.getJobName().replace('-', '_');
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
index 8b7ed07..85643e4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
@@ -17,17 +17,9 @@
*/
package org.apache.beam.examples.common;
-import com.google.common.base.MoreObjects;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
/**
* Options that can be used to configure the Beam examples.
@@ -42,39 +34,4 @@ public interface ExampleOptions extends PipelineOptions {
@Default.Integer(1)
int getInjectorNumWorkers();
void setInjectorNumWorkers(int numWorkers);
-
- @Description("A normalized unique name that is used to name anything related to the pipeline."
- + "It defaults to ApplicationName-UserName-Date-RandomInteger")
- @Default.InstanceFactory(NormalizedUniqueNameFactory.class)
- String getNormalizedUniqueName();
- void setNormalizedUniqueName(String numWorkers);
-
- /**
- * Returns a normalized unique name constructed from {@link ApplicationNameOptions#getAppName()},
- * the local system user name (if available), the current time, and a random integer.
- *
- * <p>The normalization makes sure that the name matches the pattern of
- * [a-z]([-a-z0-9]*[a-z0-9])?.
- */
- public static class NormalizedUniqueNameFactory implements DefaultValueFactory<String> {
- private static final DateTimeFormatter FORMATTER =
- DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
- @Override
- public String create(PipelineOptions options) {
- String appName = options.as(ApplicationNameOptions.class).getAppName();
- String normalizedAppName = appName == null || appName.length() == 0 ? "BeamApp"
- : appName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0")
- .replaceAll("^[^a-z]", "a");
- String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
- String normalizedUserName = userName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0");
- String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
-
- String randomPart = Integer.toHexString(ThreadLocalRandom.current().nextInt());
- return String.format("%s-%s-%s-%s",
- normalizedAppName, normalizedUserName, datePart, randomPart);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
index 36893a3..7f954a1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -39,7 +39,7 @@ public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubT
@Override
public String create(PipelineOptions options) {
return "projects/" + options.as(GcpOptions.class).getProject()
- + "/subscriptions/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
+ + "/subscriptions/" + options.getJobName();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
index 1c9270b..71879b7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
@@ -39,7 +39,7 @@ public interface ExamplePubsubTopicOptions extends GcpOptions {
@Override
public String create(PipelineOptions options) {
return "projects/" + options.as(GcpOptions.class).getProject()
- + "/topics/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
+ + "/topics/" + options.getJobName();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 6d1a8d0..6561fa5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -22,14 +22,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
/**
* Options which can be used to configure a Flink PipelineRunner.
@@ -50,15 +45,6 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
void setFilesToStage(List<String> value);
/**
- * The job name is used to identify jobs running on a Flink cluster.
- */
- @Description("Flink job name, to uniquely identify active jobs. "
- + "Defaults to using the ApplicationName-UserName-Date.")
- @Default.InstanceFactory(JobNameFactory.class)
- String getJobName();
- void setJobName(String value);
-
- /**
* The url of the Flink JobManager on which to execute pipelines. This can either be
* the the address of a cluster JobManager, in the form "host:port" or one of the special
* Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
@@ -93,24 +79,4 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
@Default.Long(-1L)
Long getExecutionRetryDelay();
void setExecutionRetryDelay(Long delay);
-
-
- class JobNameFactory implements DefaultValueFactory<String> {
- private static final DateTimeFormatter FORMATTER =
- DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
- @Override
- public String create(PipelineOptions options) {
- String appName = options.as(ApplicationNameOptions.class).getAppName();
- String normalizedAppName = appName == null || appName.length() == 0 ? "FlinkRunner"
- : appName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0")
- .replaceAll("^[^a-z]", "a");
- String userName = System.getProperty("user.name", "");
- String normalizedUserName = userName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0");
- String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
- return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 8ef43c5..9f58f93 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.options;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
-import com.google.common.base.MoreObjects;
import java.io.IOException;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.options.ApplicationNameOptions;
@@ -35,10 +34,6 @@ import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.util.IOChannelUtils;
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
/**
* Options that can be used to configure the {@link DataflowRunner}.
@@ -76,21 +71,6 @@ public interface DataflowPipelineOptions
void setStagingLocation(String value);
/**
- * The Dataflow job name is used as an idempotence key within the Dataflow service.
- * If there is an existing job that is currently active, another active job with the same
- * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
- */
- @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
- + "For each running job in the same GCP project, jobs with the same name cannot be created "
- + "unless the new job is an explicit update of the previous one. Defaults to using "
- + "ApplicationName-UserName-Date. The job name must match the regular expression "
- + "'[a-z]([-a-z0-9]{0,38}[a-z0-9])?'. The runner will automatically truncate the name of the "
- + "job and convert to lower case.")
- @Default.InstanceFactory(JobNameFactory.class)
- String getJobName();
- void setJobName(String value);
-
- /**
* Whether to update the currently running pipeline with the same name as this one.
*/
@Description(
@@ -100,34 +80,6 @@ public interface DataflowPipelineOptions
void setUpdate(boolean value);
/**
- * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
- * local system user name (if available), and the current time. The normalization makes sure that
- * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
- * characters.
- *
- * <p>This job name factory is only able to generate one unique name per second per application
- * and user combination.
- */
- public static class JobNameFactory implements DefaultValueFactory<String> {
- private static final DateTimeFormatter FORMATTER =
- DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
- @Override
- public String create(PipelineOptions options) {
- String appName = options.as(ApplicationNameOptions.class).getAppName();
- String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
- : appName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0")
- .replaceAll("^[^a-z]", "a");
- String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
- String normalizedUserName = userName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0");
- String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
- return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
- }
- }
-
- /**
* Returns a default staging location under {@link GcpOptions#getGcpTempLocation}.
*/
public static class StagingLocationFactory implements DefaultValueFactory<String> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 61d3992..202d04b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -53,7 +53,13 @@ public class DataflowPipelineOptionsTest {
System.getProperties().remove("user.name");
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setAppName("TestApplication");
- assertEquals("testapplication--1208190706", options.getJobName());
+ String[] nameComponents = options.getJobName().split("-");
+ assertEquals(4, nameComponents.length);
+ assertEquals("testapplication", nameComponents[0]);
+ assertEquals("", nameComponents[1]);
+ assertEquals("1208190706", nameComponents[2]);
+ // Verify the last component is a hex integer (unsigned).
+ Long.parseLong(nameComponents[3], 16);
assertTrue(options.getJobName().length() <= 40);
}
@@ -63,9 +69,13 @@ public class DataflowPipelineOptionsTest {
System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setAppName("1234567890123456789012345678901234567890");
- assertEquals(
- "a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706",
- options.getJobName());
+ String[] nameComponents = options.getJobName().split("-");
+ assertEquals(4, nameComponents.length);
+ assertEquals("a234567890123456789012345678901234567890", nameComponents[0]);
+ assertEquals("abcdeabcdeabcdeabcdeabcdeabcde", nameComponents[1]);
+ assertEquals("1208190706", nameComponents[2]);
+ // Verify the last component is a hex integer (unsigned).
+ Long.parseLong(nameComponents[3], 16);
}
@Test
@@ -74,7 +84,13 @@ public class DataflowPipelineOptionsTest {
System.getProperties().put("user.name", "abcde");
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setAppName("1234567890123456789012345678901234567890");
- assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", options.getJobName());
+ String[] nameComponents = options.getJobName().split("-");
+ assertEquals(4, nameComponents.length);
+ assertEquals("a234567890123456789012345678901234567890", nameComponents[0]);
+ assertEquals("abcde", nameComponents[1]);
+ assertEquals("1208190706", nameComponents[2]);
+ // Verify the last component is a hex integer (unsigned).
+ Long.parseLong(nameComponents[3], 16);
}
@Test
@@ -83,7 +99,13 @@ public class DataflowPipelineOptionsTest {
System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setAppName("1234567890");
- assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", options.getJobName());
+ String[] nameComponents = options.getJobName().split("-");
+ assertEquals(4, nameComponents.length);
+ assertEquals("a234567890", nameComponents[0]);
+ assertEquals("abcdeabcdeabcdeabcdeabcdeabcde", nameComponents[1]);
+ assertEquals("1208190706", nameComponents[2]);
+ // Verify the last component is a hex integer (unsigned).
+ Long.parseLong(nameComponents[3], 16);
}
@Test
@@ -92,7 +114,13 @@ public class DataflowPipelineOptionsTest {
System.getProperties().put("user.name", "�i \u0131nt\u0259\u02c8n�\u0283\u0259n\u0259l ");
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setAppName("f\u0259\u02c8n\u025bt\u0131k \u0259so\u028asi\u02c8e\u0131\u0283n");
- assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName());
+ String[] nameComponents = options.getJobName().split("-");
+ assertEquals(4, nameComponents.length);
+ assertEquals("f00n0t0k00so0si0e00n", nameComponents[0]);
+ assertEquals("0i00nt00n000n0l0", nameComponents[1]);
+ assertEquals("1208190706", nameComponents[2]);
+ // Verify the last component is a hex integer (unsigned).
+ Long.parseLong(nameComponents[3], 16);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b04776d4/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index db54d0a..701ae70 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -22,8 +22,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.auto.service.AutoService;
+import com.google.common.base.MoreObjects;
import java.lang.reflect.Proxy;
import java.util.ServiceLoader;
+import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
@@ -33,6 +35,10 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Context;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
/**
* PipelineOptions are used to configure Pipelines. You can extend {@link PipelineOptions}
@@ -258,6 +264,13 @@ public interface PipelineOptions extends HasDisplayData {
String getTempLocation();
void setTempLocation(String value);
+ @Description("Name of the pipeline execution."
+ + "It must match the regular expression '[a-z]([-a-z0-9]{0,38}[a-z0-9])?'."
+ + "It defaults to ApplicationName-UserName-Date-RandomInteger")
+ @Default.InstanceFactory(JobNameFactory.class)
+ String getJobName();
+ void setJobName(String numWorkers);
+
/**
* A {@link DefaultValueFactory} that obtains the class of the {@code DirectRunner} if it exists
* on the classpath, and throws an exception otherwise.
@@ -284,4 +297,33 @@ public interface PipelineOptions extends HasDisplayData {
}
}
}
+
+ /**
+ * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()},
+ * the local system user name (if available), the current time, and a random integer.
+ *
+ * <p>The normalization makes sure that the name matches the pattern of
+ * [a-z]([-a-z0-9]*[a-z0-9])?.
+ */
+ static class JobNameFactory implements DefaultValueFactory<String> {
+ private static final DateTimeFormatter FORMATTER =
+ DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+ @Override
+ public String create(PipelineOptions options) {
+ String appName = options.as(ApplicationNameOptions.class).getAppName();
+ String normalizedAppName = appName == null || appName.length() == 0 ? "BeamApp"
+ : appName.toLowerCase()
+ .replaceAll("[^a-z0-9]", "0")
+ .replaceAll("^[^a-z]", "a");
+ String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
+ String normalizedUserName = userName.toLowerCase()
+ .replaceAll("[^a-z0-9]", "0");
+ String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+
+ String randomPart = Integer.toHexString(ThreadLocalRandom.current().nextInt());
+ return String.format("%s-%s-%s-%s",
+ normalizedAppName, normalizedUserName, datePart, randomPart);
+ }
+ }
}