You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/18 19:09:42 UTC
[2/2] incubator-beam git commit: [BEAM-433] Remove references to
JobName in examples pipeline options.
[BEAM-433] Remove references to JobName in examples pipeline options.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7313fb56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7313fb56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7313fb56
Branch: refs/heads/master
Commit: 7313fb56a1311c918da6f07cc01021b196212a47
Parents: f776b11
Author: Pei He <pe...@google.com>
Authored: Mon Jul 11 14:05:46 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 18 12:09:32 2016 -0700
----------------------------------------------------------------------
.../common/ExampleBigQueryTableOptions.java | 10 ++--
.../beam/examples/common/ExampleOptions.java | 50 +++++++++++++++++++-
...xamplePubsubTopicAndSubscriptionOptions.java | 10 ++--
.../common/ExamplePubsubTopicOptions.java | 12 ++---
.../beam/examples/complete/AutoComplete.java | 7 +--
.../examples/complete/StreamingWordExtract.java | 5 +-
.../beam/examples/cookbook/TriggerExample.java | 4 +-
.../beam/examples/complete/game/GameStats.java | 4 +-
.../examples/complete/game/LeaderBoard.java | 7 +--
9 files changed, 79 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
index 36304a8..54cc99e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
@@ -17,21 +17,21 @@
*/
package org.apache.beam.examples.common;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import com.google.api.services.bigquery.model.TableSchema;
/**
- * Options that can be used to configure BigQuery tables in Dataflow examples.
+ * Options that can be used to configure BigQuery tables in Beam examples.
* The project defaults to the project being used to run the example.
*/
-public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
+public interface ExampleBigQueryTableOptions extends GcpOptions {
@Description("BigQuery dataset name")
- @Default.String("dataflow_examples")
+ @Default.String("beam_examples")
String getBigQueryDataset();
void setBigQueryDataset(String dataset);
@@ -50,7 +50,7 @@ public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
static class BigQueryTableFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
- return options.as(DataflowPipelineOptions.class).getJobName()
+ return options.as(ExampleOptions.class).getNormalizedUniqueName()
.replace('-', '_');
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
index bba7b21..43afeb4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
@@ -17,14 +17,25 @@
*/
package org.apache.beam.examples.common;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.google.common.base.MoreObjects;
+
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.concurrent.ThreadLocalRandom;
/**
* Options that can be used to configure the Beam examples.
*/
-public interface ExampleOptions extends DataflowPipelineOptions {
+public interface ExampleOptions extends PipelineOptions {
@Description("Whether to keep jobs running on the Dataflow service after local process exit")
@Default.Boolean(false)
boolean getKeepJobsRunning();
@@ -34,4 +45,39 @@ public interface ExampleOptions extends DataflowPipelineOptions {
@Default.Integer(1)
int getInjectorNumWorkers();
void setInjectorNumWorkers(int numWorkers);
+
+ @Description("A normalized unique name that is used to name anything related to the pipeline."
+ + "It defaults to ApplicationName-UserName-Date-RandomInteger")
+ @Default.InstanceFactory(NormalizedUniqueNameFactory.class)
+ String getNormalizedUniqueName();
+ void setNormalizedUniqueName(String numWorkers);
+
+ /**
+ * Returns a normalized unique name constructed from {@link ApplicationNameOptions#getAppName()},
+ * the local system user name (if available), the current time, and a random integer.
+ *
+ * <p>The normalization makes sure that the name matches the pattern of
+ * [a-z]([-a-z0-9]*[a-z0-9])?.
+ */
+ public static class NormalizedUniqueNameFactory implements DefaultValueFactory<String> {
+ private static final DateTimeFormatter FORMATTER =
+ DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+ @Override
+ public String create(PipelineOptions options) {
+ String appName = options.as(ApplicationNameOptions.class).getAppName();
+ String normalizedAppName = appName == null || appName.length() == 0 ? "BeamApp"
+ : appName.toLowerCase()
+ .replaceAll("[^a-z0-9]", "0")
+ .replaceAll("^[^a-z]", "a");
+ String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
+ String normalizedUserName = userName.toLowerCase()
+ .replaceAll("[^a-z0-9]", "0");
+ String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+
+ String randomPart = Integer.toHexString(ThreadLocalRandom.current().nextInt());
+ return String.format("%s-%s-%s-%s",
+ normalizedAppName, normalizedUserName, datePart, randomPart);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
index 22bcf4e..36893a3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -17,14 +17,14 @@
*/
package org.apache.beam.examples.common;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
- * Options that can be used to configure Pub/Sub topic/subscription in Dataflow examples.
+ * Options that can be used to configure Pub/Sub topic/subscription in Beam examples.
*/
public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions {
@Description("Pub/Sub subscription")
@@ -38,10 +38,8 @@ public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubT
static class PubsubSubscriptionFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowPipelineOptions =
- options.as(DataflowPipelineOptions.class);
- return "projects/" + dataflowPipelineOptions.getProject()
- + "/subscriptions/" + dataflowPipelineOptions.getJobName();
+ return "projects/" + options.as(GcpOptions.class).getProject()
+ + "/subscriptions/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
index 603e309..1c9270b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java
@@ -17,16 +17,16 @@
*/
package org.apache.beam.examples.common;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
- * Options that can be used to configure Pub/Sub topic in Dataflow examples.
+ * Options that can be used to configure Pub/Sub topic in Beam examples.
*/
-public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions {
+public interface ExamplePubsubTopicOptions extends GcpOptions {
@Description("Pub/Sub topic")
@Default.InstanceFactory(PubsubTopicFactory.class)
String getPubsubTopic();
@@ -38,10 +38,8 @@ public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions {
static class PubsubTopicFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowPipelineOptions =
- options.as(DataflowPipelineOptions.class);
- return "projects/" + dataflowPipelineOptions.getProject()
- + "/topics/" + dataflowPipelineOptions.getJobName();
+ return "projects/" + options.as(GcpOptions.class).getProject()
+ + "/topics/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index e6cc0cc..26f6045 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -32,6 +33,7 @@ import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
@@ -48,7 +50,6 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -416,7 +417,8 @@ public class AutoComplete {
*
* <p>Inherits standard Dataflow configuration options.
*/
- private static interface Options extends ExampleBigQueryTableOptions {
+ private static interface Options
+ extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
@Description("Input text file")
@Validation.Required
String getInputFile();
@@ -455,7 +457,6 @@ public class AutoComplete {
// We support running the same pipeline in either
// batch or windowed streaming mode.
- PTransform<? super PBegin, PCollection<String>> readSource;
WindowFn<Object, ?> windowFn;
if (options.isStreaming()) {
checkArgument(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index df2e36e..d4ba8bd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -18,6 +18,7 @@
package org.apache.beam.examples.complete;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
+import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -26,6 +27,7 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
@@ -101,7 +103,8 @@ public class StreamingWordExtract {
*
* <p>Inherits standard configuration options.
*/
- private interface StreamingWordExtractOptions extends ExampleBigQueryTableOptions {
+ private interface StreamingWordExtractOptions
+ extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInputFile();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 922cf23..bbff6f9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -419,7 +420,8 @@ public class TriggerExample {
/**
* Inherits standard configuration options.
*/
- public interface TrafficFlowOptions extends ExampleBigQueryTableOptions, ExampleOptions {
+ public interface TrafficFlowOptions
+ extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
@Description("Input file to read from")
@Default.String("gs://dataflow-samples/traffic_sensor/"
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 5b27f83..33b8727 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -242,7 +242,7 @@ public class GameStats extends LeaderBoard {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
- ExampleUtils dataflowUtils = new ExampleUtils(options);
+ ExampleUtils exampleUtils = new ExampleUtils(options);
Pipeline pipeline = Pipeline.create(options);
// Read Events from Pub/Sub using custom timestamps
@@ -330,6 +330,6 @@ public class GameStats extends LeaderBoard {
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
- dataflowUtils.waitToFinish(result);
+ exampleUtils.waitToFinish(result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7313fb56/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 051b4de..bd22305 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
@@ -102,7 +103,7 @@ public class LeaderBoard extends HourlyTeamScore {
/**
* Options supported by {@link LeaderBoard}.
*/
- static interface Options extends HourlyTeamScore.Options, ExampleOptions {
+ static interface Options extends HourlyTeamScore.Options, ExampleOptions, StreamingOptions {
@Description("Pub/Sub topic to read from")
@Validation.Required
@@ -178,7 +179,7 @@ public class LeaderBoard extends HourlyTeamScore {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
- ExampleUtils dataflowUtils = new ExampleUtils(options);
+ ExampleUtils exampleUtils = new ExampleUtils(options);
Pipeline pipeline = Pipeline.create(options);
// Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
@@ -231,6 +232,6 @@ public class LeaderBoard extends HourlyTeamScore {
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
- dataflowUtils.waitToFinish(result);
+ exampleUtils.waitToFinish(result);
}
}