You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/10 18:14:38 UTC
[1/2] beam git commit: Use text output for first two mobile gaming
examples
Repository: beam
Updated Branches:
refs/heads/master c54670fc2 -> a4f7a9c3f
Use text output for first two mobile gaming examples
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/758fee8e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/758fee8e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/758fee8e
Branch: refs/heads/master
Commit: 758fee8e95bbbda985988b6ea92f6c7a741bf74d
Parents: c54670f
Author: Ahmet Altay <al...@google.com>
Authored: Tue May 9 16:45:24 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed May 10 11:12:20 2017 -0700
----------------------------------------------------------------------
.../examples/complete/game/HourlyTeamScore.java | 57 ++----
.../examples/complete/game/LeaderBoard.java | 26 +++
.../beam/examples/complete/game/UserScore.java | 56 +++---
.../complete/game/utils/WriteToText.java | 184 +++++++++++++++++++
4 files changed, 251 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 2928882..6a322da 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -20,9 +20,8 @@ package org.apache.beam.examples.complete.game;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
-import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
+import org.apache.beam.examples.complete.game.utils.WriteToText;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -58,12 +57,11 @@ import org.joda.time.format.DateTimeFormatter;
* like this:
* <pre>{@code
* --project=YOUR_PROJECT_ID
- * --tempLocation=gs://YOUR_TEMP_DIRECTORY
- * --runner=BlockingDataflowRunner
- * --dataset=YOUR-DATASET
+ * --tempLocation=YOUR_TEMP_DIRECTORY
+ * --runner=YOUR_RUNNER
+ * --output=YOUR_OUTPUT_DIRECTORY
* }
* </pre>
- * where the BigQuery dataset you specify must already exist.
*
* <p>Optionally include {@code --input} to specify the batch input file path.
* To indicate a time after which the data should be filtered out, include the
@@ -107,39 +105,26 @@ public class HourlyTeamScore extends UserScore {
@Default.String("2100-01-01-00-00")
String getStopMin();
void setStopMin(String value);
-
- @Description("The BigQuery table name. Should not already exist.")
- @Default.String("hourly_team_score")
- String getHourlyTeamScoreTableName();
- void setHourlyTeamScoreTableName(String value);
}
/**
- * Create a map of information that describes how to write pipeline output to BigQuery. This map
- * is passed to the {@link WriteWindowedToBigQuery} constructor to write team score sums and
+ * Create a map of information that describes how to write pipeline output to text. This map
+ * is passed to the {@link WriteToText} constructor to write team score sums and
* includes information about window start time.
*/
- protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
- configureWindowedTableWrite() {
- Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfig =
- new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
- tableConfig.put(
- "team",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
- "STRING", (c, w) -> c.element().getKey()));
- tableConfig.put(
- "total_score",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
- "INTEGER", (c, w) -> c.element().getValue()));
- tableConfig.put(
+ protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>>
+ configureOutput() {
+ Map<String, WriteToText.FieldFn<KV<String, Integer>>> config =
+ new HashMap<String, WriteToText.FieldFn<KV<String, Integer>>>();
+ config.put("team", (c, w) -> c.element().getKey());
+ config.put("total_score", (c, w) -> c.element().getValue());
+ config.put(
"window_start",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
- "STRING",
- (c, w) -> {
+ (c, w) -> {
IntervalWindow window = (IntervalWindow) w;
return fmt.print(window.start());
- }));
- return tableConfig;
+ });
+ return config;
}
@@ -186,12 +171,10 @@ public class HourlyTeamScore extends UserScore {
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"))
.apply("WriteTeamScoreSums",
- new WriteWindowedToBigQuery<KV<String, Integer>>(
- options.as(GcpOptions.class).getProject(),
- options.getDataset(),
- options.getHourlyTeamScoreTableName(),
- configureWindowedTableWrite()));
-
+ new WriteToText<KV<String, Integer>>(
+ options.getOutput(),
+ configureOutput(),
+ true));
pipeline.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index bfad9f6..f673a8d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -106,6 +106,11 @@ public class LeaderBoard extends HourlyTeamScore {
*/
interface Options extends HourlyTeamScore.Options, ExampleOptions, StreamingOptions {
+ @Description("BigQuery Dataset to write tables to. Must already exist.")
+ @Validation.Required
+ String getDataset();
+ void setDataset(String value);
+
@Description("Pub/Sub topic to read from")
@Validation.Required
String getTopic();
@@ -163,6 +168,27 @@ public class LeaderBoard extends HourlyTeamScore {
return tableConfigure;
}
+
+ /**
+ * Create a map of information that describes how to write pipeline output to BigQuery. This map
+ * is passed to the {@link WriteToBigQuery} constructor to write user score sums.
+ */
+ protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
+ configureBigQueryWrite() {
+ Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+ new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
+ tableConfigure.put(
+ "user",
+ new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", (c, w) -> c.element().getKey()));
+ tableConfigure.put(
+ "total_score",
+ new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+ "INTEGER", (c, w) -> c.element().getValue()));
+ return tableConfigure;
+ }
+
+
/**
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is used to write user score sums.
http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 8110146..7297bcd 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -20,11 +20,10 @@ package org.apache.beam.examples.complete.game;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.reflect.Nullable;
-import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
+import org.apache.beam.examples.complete.game.utils.WriteToText;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
@@ -62,9 +61,9 @@ import org.slf4j.LoggerFactory;
* the pipeline configuration like this:
* <pre>{@code
* --project=YOUR_PROJECT_ID
- * --tempLocation=gs://YOUR_TEMP_DIRECTORY
- * --runner=BlockingDataflowRunner
- * --dataset=YOUR-DATASET
+ * --tempLocation=YOUR_TEMP_DIRECTORY
+ * --runner=YOUR_RUNNER
+ * --output=YOUR_OUTPUT_DIRECTORY
* }
* </pre>
* where the BigQuery dataset you specify must already exist.
@@ -186,37 +185,26 @@ public class UserScore {
String getInput();
void setInput(String value);
- @Description("BigQuery Dataset to write tables to. Must already exist.")
+ // Set this required option to specify where to write the output.
+ @Description("Path of the file to write to.")
@Validation.Required
- String getDataset();
- void setDataset(String value);
-
- @Description("The BigQuery table name. Should not already exist.")
- @Default.String("user_score")
- String getUserScoreTableName();
- void setUserScoreTableName(String value);
+ String getOutput();
+ void setOutput(String value);
}
/**
- * Create a map of information that describes how to write pipeline output to BigQuery. This map
- * is passed to the {@link WriteToBigQuery} constructor to write user score sums.
+ * Create a map of information that describes how to write pipeline output to text. This map
+ * is passed to the {@link WriteToText} constructor to write user score sums.
*/
- protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
- configureBigQueryWrite() {
- Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
- new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
- tableConfigure.put(
- "user",
- new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
- "STRING", (c, w) -> c.element().getKey()));
- tableConfigure.put(
- "total_score",
- new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
- "INTEGER", (c, w) -> c.element().getValue()));
- return tableConfigure;
+ protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>>
+ configureOutput() {
+ Map<String, WriteToText.FieldFn<KV<String, Integer>>> config =
+ new HashMap<String, WriteToText.FieldFn<KV<String, Integer>>>();
+ config.put("user", (c, w) -> c.element().getKey());
+ config.put("total_score", (c, w) -> c.element().getValue());
+ return config;
}
-
/**
* Run a batch pipeline.
*/
@@ -234,15 +222,13 @@ public class UserScore {
.apply("ExtractUserScore", new ExtractAndSumScore("user"))
.apply(
"WriteUserScoreSums",
- new WriteToBigQuery<KV<String, Integer>>(
- options.as(GcpOptions.class).getProject(),
- options.getDataset(),
- options.getUserScoreTableName(),
- configureBigQueryWrite()));
+ new WriteToText<KV<String, Integer>>(
+ options.getOutput(),
+ configureOutput(),
+ false));
// Run the batch pipeline.
pipeline.run().waitUntilFinish();
}
// [END DocInclude_USMain]
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
new file mode 100644
index 0000000..e6c8ddb
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game.utils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * Generate, format, and write rows. Use provided information about the field names and types, as
+ * well as lambda functions that describe how to generate their values.
+ */
+public class WriteToText<InputT>
+ extends PTransform<PCollection<InputT>, PDone> {
+
+ private static final DateTimeFormatter formatter =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+ .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+ protected String filenamePrefix;
+ protected Map<String, FieldFn<InputT>> fieldFn;
+ protected boolean windowed;
+
+ public WriteToText() {
+ }
+
+ public WriteToText(
+ String filenamePrefix,
+ Map<String, FieldFn<InputT>> fieldFn,
+ boolean windowed) {
+ this.filenamePrefix = filenamePrefix;
+ this.fieldFn = fieldFn;
+ this.windowed = windowed;
+ }
+
+ /**
+ * A {@link Serializable} function from a {@link DoFn.ProcessContext}
+ * and {@link BoundedWindow} to the value for that field.
+ */
+ public interface FieldFn<InputT> extends Serializable {
+ Object apply(DoFn<InputT, String>.ProcessContext context, BoundedWindow window);
+ }
+
+ /** Convert each key/score pair into a row as specified by fieldFn. */
+ protected class BuildRowFn extends DoFn<InputT, String> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ List<String> fields = new ArrayList<String>();
+ for (Map.Entry<String, FieldFn<InputT>> entry : fieldFn.entrySet()) {
+ String key = entry.getKey();
+ FieldFn<InputT> fcn = entry.getValue();
+ fields.add(key + ": " + fcn.apply(c, window));
+ }
+ String result = fields.stream().collect(Collectors.joining(", "));
+ c.output(result);
+ }
+ }
+
+ /**
+ * A {@link DoFn} that writes elements to files with names deterministically derived from the
+ * lower and upper bounds of their key (an {@link IntervalWindow}).
+ */
+ protected class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
+
+ private final String filenamePrefix;
+
+ public WriteOneFilePerWindow(String filenamePrefix) {
+ this.filenamePrefix = filenamePrefix;
+ }
+
+ @Override
+ public PDone expand(PCollection<String> input) {
+ // Verify that the input has a compatible window type.
+ checkArgument(
+ input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());
+
+ // filenamePrefix may contain a directory and a filename component. Pull out only the filename
+ // component from that path for the PerWindowFiles.
+ String prefix = "";
+ ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+ if (!resource.isDirectory()) {
+ prefix = verifyNotNull(
+ resource.getFilename(),
+ "A non-directory resource should have a non-null filename: %s",
+ resource);
+ }
+
+ return input.apply(
+ TextIO.write()
+ .to(resource.getCurrentDirectory())
+ .withFilenamePolicy(new PerWindowFiles(prefix))
+ .withWindowedWrites()
+ .withNumShards(3));
+ }
+ }
+
+ /**
+ * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
+ * being written. This always includes the shard number and the total number of shards. For
+ * windowed writes, it also includes the window and pane index (a sequence number assigned to each
+ * trigger firing).
+ */
+ protected static class PerWindowFiles extends FilenamePolicy {
+
+ private final String prefix;
+
+ public PerWindowFiles(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public String filenamePrefixForWindow(IntervalWindow window) {
+ return String.format("%s-%s-%s",
+ prefix, formatter.print(window.start()), formatter.print(window.end()));
+ }
+
+ @Override
+ public ResourceId windowedFilename(
+ ResourceId outputDirectory, WindowedContext context, String extension) {
+ IntervalWindow window = (IntervalWindow) context.getWindow();
+ String filename = String.format(
+ "%s-%s-of-%s%s",
+ filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+ extension);
+ return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ }
+
+ @Override
+ public ResourceId unwindowedFilename(
+ ResourceId outputDirectory, Context context, String extension) {
+ throw new UnsupportedOperationException("Unsupported.");
+ }
+ }
+
+ @Override
+ public PDone expand(PCollection<InputT> teamAndScore) {
+ if (windowed) {
+ teamAndScore
+ .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
+ .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix));
+ } else {
+ teamAndScore
+ .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
+ .apply(TextIO.write().to(filenamePrefix));
+ }
+ return PDone.in(teamAndScore.getPipeline());
+ }
+}
[2/2] beam git commit: This closes #3024
Posted by al...@apache.org.
This closes #3024
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a4f7a9c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a4f7a9c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a4f7a9c3
Branch: refs/heads/master
Commit: a4f7a9c3f8997c3ff6dfe6cd6e1e42b451f8affe
Parents: c54670f 758fee8
Author: Ahmet Altay <al...@google.com>
Authored: Wed May 10 11:14:26 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed May 10 11:14:26 2017 -0700
----------------------------------------------------------------------
.../examples/complete/game/HourlyTeamScore.java | 57 ++----
.../examples/complete/game/LeaderBoard.java | 26 +++
.../beam/examples/complete/game/UserScore.java | 56 +++---
.../complete/game/utils/WriteToText.java | 184 +++++++++++++++++++
4 files changed, 251 insertions(+), 72 deletions(-)
----------------------------------------------------------------------