You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/10 18:14:38 UTC

[1/2] beam git commit: Use text output for first two mobile gaming examples

Repository: beam
Updated Branches:
  refs/heads/master c54670fc2 -> a4f7a9c3f


Use text output for first two mobile gaming examples


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/758fee8e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/758fee8e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/758fee8e

Branch: refs/heads/master
Commit: 758fee8e95bbbda985988b6ea92f6c7a741bf74d
Parents: c54670f
Author: Ahmet Altay <al...@google.com>
Authored: Tue May 9 16:45:24 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed May 10 11:12:20 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/HourlyTeamScore.java |  57 ++----
 .../examples/complete/game/LeaderBoard.java     |  26 +++
 .../beam/examples/complete/game/UserScore.java  |  56 +++---
 .../complete/game/utils/WriteToText.java        | 184 +++++++++++++++++++
 4 files changed, 251 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 2928882..6a322da 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -20,9 +20,8 @@ package org.apache.beam.examples.complete.game;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
-import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
+import org.apache.beam.examples.complete.game.utils.WriteToText;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -58,12 +57,11 @@ import org.joda.time.format.DateTimeFormatter;
  * like this:
  * <pre>{@code
  *   --project=YOUR_PROJECT_ID
- *   --tempLocation=gs://YOUR_TEMP_DIRECTORY
- *   --runner=BlockingDataflowRunner
- *   --dataset=YOUR-DATASET
+ *   --tempLocation=YOUR_TEMP_DIRECTORY
+ *   --runner=YOUR_RUNNER
+ *   --output=YOUR_OUTPUT_DIRECTORY
  * }
  * </pre>
- * where the BigQuery dataset you specify must already exist.
  *
  * <p>Optionally include {@code --input} to specify the batch input file path.
  * To indicate a time after which the data should be filtered out, include the
@@ -107,39 +105,26 @@ public class HourlyTeamScore extends UserScore {
     @Default.String("2100-01-01-00-00")
     String getStopMin();
     void setStopMin(String value);
-
-    @Description("The BigQuery table name. Should not already exist.")
-    @Default.String("hourly_team_score")
-    String getHourlyTeamScoreTableName();
-    void setHourlyTeamScoreTableName(String value);
   }
 
   /**
-   * Create a map of information that describes how to write pipeline output to BigQuery. This map
-   * is passed to the {@link WriteWindowedToBigQuery} constructor to write team score sums and
+   * Create a map of information that describes how to write pipeline output to text. This map
+   * is passed to the {@link WriteToText} constructor to write team score sums and
    * includes information about window start time.
    */
-  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
-      configureWindowedTableWrite() {
-    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfig =
-        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
-    tableConfig.put(
-        "team",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", (c, w) -> c.element().getKey()));
-    tableConfig.put(
-        "total_score",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
-            "INTEGER", (c, w) -> c.element().getValue()));
-    tableConfig.put(
+  protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>>
+      configureOutput() {
+    Map<String, WriteToText.FieldFn<KV<String, Integer>>> config =
+        new HashMap<String, WriteToText.FieldFn<KV<String, Integer>>>();
+    config.put("team", (c, w) -> c.element().getKey());
+    config.put("total_score", (c, w) -> c.element().getValue());
+    config.put(
         "window_start",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING",
-            (c, w) -> {
+        (c, w) -> {
               IntervalWindow window = (IntervalWindow) w;
               return fmt.print(window.start());
-            }));
-    return tableConfig;
+            });
+    return config;
   }
 
 
@@ -186,12 +171,10 @@ public class HourlyTeamScore extends UserScore {
       // Extract and sum teamname/score pairs from the event data.
       .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
       .apply("WriteTeamScoreSums",
-        new WriteWindowedToBigQuery<KV<String, Integer>>(
-            options.as(GcpOptions.class).getProject(),
-            options.getDataset(),
-            options.getHourlyTeamScoreTableName(),
-            configureWindowedTableWrite()));
-
+          new WriteToText<KV<String, Integer>>(
+              options.getOutput(),
+              configureOutput(),
+              true));
 
     pipeline.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index bfad9f6..f673a8d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -106,6 +106,11 @@ public class LeaderBoard extends HourlyTeamScore {
    */
   interface Options extends HourlyTeamScore.Options, ExampleOptions, StreamingOptions {
 
+    @Description("BigQuery Dataset to write tables to. Must already exist.")
+    @Validation.Required
+    String getDataset();
+    void setDataset(String value);
+
     @Description("Pub/Sub topic to read from")
     @Validation.Required
     String getTopic();
@@ -163,6 +168,27 @@ public class LeaderBoard extends HourlyTeamScore {
     return tableConfigure;
   }
 
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is passed to the {@link WriteToBigQuery} constructor to write user score sums.
+   */
+  protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
+  configureBigQueryWrite() {
+    Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+        new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
+    tableConfigure.put(
+        "user",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", (c, w) -> c.element().getKey()));
+    tableConfigure.put(
+        "total_score",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+            "INTEGER", (c, w) -> c.element().getValue()));
+    return tableConfigure;
+  }
+
+
   /**
    * Create a map of information that describes how to write pipeline output to BigQuery. This map
    * is used to write user score sums.

http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 8110146..7297bcd 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -20,11 +20,10 @@ package org.apache.beam.examples.complete.game;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.avro.reflect.Nullable;
-import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
+import org.apache.beam.examples.complete.game.utils.WriteToText;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -62,9 +61,9 @@ import org.slf4j.LoggerFactory;
  * the pipeline configuration like this:
  * <pre>{@code
  *   --project=YOUR_PROJECT_ID
- *   --tempLocation=gs://YOUR_TEMP_DIRECTORY
- *   --runner=BlockingDataflowRunner
- *   --dataset=YOUR-DATASET
+ *   --tempLocation=YOUR_TEMP_DIRECTORY
+ *   --runner=YOUR_RUNNER
+ *   --output=YOUR_OUTPUT_DIRECTORY
  * }
  * </pre>
  * where the BigQuery dataset you specify must already exist.
@@ -186,37 +185,26 @@ public class UserScore {
     String getInput();
     void setInput(String value);
 
-    @Description("BigQuery Dataset to write tables to. Must already exist.")
+    // Set this required option to specify where to write the output.
+    @Description("Path of the file to write to.")
     @Validation.Required
-    String getDataset();
-    void setDataset(String value);
-
-    @Description("The BigQuery table name. Should not already exist.")
-    @Default.String("user_score")
-    String getUserScoreTableName();
-    void setUserScoreTableName(String value);
+    String getOutput();
+    void setOutput(String value);
   }
 
   /**
-   * Create a map of information that describes how to write pipeline output to BigQuery. This map
-   * is passed to the {@link WriteToBigQuery} constructor to write user score sums.
+   * Create a map of information that describes how to write pipeline output to text. This map
+   * is passed to the {@link WriteToText} constructor to write user score sums.
    */
-  protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
-      configureBigQueryWrite() {
-    Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
-        new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
-    tableConfigure.put(
-        "user",
-        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", (c, w) -> c.element().getKey()));
-    tableConfigure.put(
-        "total_score",
-        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
-            "INTEGER", (c, w) -> c.element().getValue()));
-    return tableConfigure;
+  protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>>
+      configureOutput() {
+    Map<String, WriteToText.FieldFn<KV<String, Integer>>> config =
+        new HashMap<String, WriteToText.FieldFn<KV<String, Integer>>>();
+    config.put("user", (c, w) -> c.element().getKey());
+    config.put("total_score", (c, w) -> c.element().getValue());
+    return config;
   }
 
-
   /**
    * Run a batch pipeline.
    */
@@ -234,15 +222,13 @@ public class UserScore {
         .apply("ExtractUserScore", new ExtractAndSumScore("user"))
         .apply(
             "WriteUserScoreSums",
-            new WriteToBigQuery<KV<String, Integer>>(
-                options.as(GcpOptions.class).getProject(),
-                options.getDataset(),
-                options.getUserScoreTableName(),
-                configureBigQueryWrite()));
+            new WriteToText<KV<String, Integer>>(
+                options.getOutput(),
+                configureOutput(),
+                false));
 
     // Run the batch pipeline.
     pipeline.run().waitUntilFinish();
   }
   // [END DocInclude_USMain]
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
new file mode 100644
index 0000000..e6c8ddb
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game.utils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * Generate, format, and write rows. Use provided information about the field names and types, as
+ * well as lambda functions that describe how to generate their values.
+ */
+public class WriteToText<InputT>
+    extends PTransform<PCollection<InputT>, PDone> {
+
+  private static final DateTimeFormatter formatter =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+  protected String filenamePrefix;
+  protected Map<String, FieldFn<InputT>> fieldFn;
+  protected boolean windowed;
+
+  public WriteToText() {
+  }
+
+  public WriteToText(
+      String filenamePrefix,
+      Map<String, FieldFn<InputT>> fieldFn,
+      boolean windowed) {
+    this.filenamePrefix = filenamePrefix;
+    this.fieldFn = fieldFn;
+    this.windowed = windowed;
+  }
+
+  /**
+   * A {@link Serializable} function from a {@link DoFn.ProcessContext}
+   * and {@link BoundedWindow} to the value for that field.
+   */
+  public interface FieldFn<InputT> extends Serializable {
+    Object apply(DoFn<InputT, String>.ProcessContext context, BoundedWindow window);
+  }
+
+  /** Convert each key/score pair into a row as specified by fieldFn. */
+  protected class BuildRowFn extends DoFn<InputT, String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      List<String> fields = new ArrayList<String>();
+      for (Map.Entry<String, FieldFn<InputT>> entry : fieldFn.entrySet()) {
+        String key = entry.getKey();
+        FieldFn<InputT> fcn = entry.getValue();
+        fields.add(key + ": " + fcn.apply(c, window));
+      }
+      String result = fields.stream().collect(Collectors.joining(", "));
+      c.output(result);
+    }
+  }
+
+  /**
+   * A {@link DoFn} that writes elements to files with names deterministically derived from the
+   * lower and upper bounds of their key (an {@link IntervalWindow}).
+   */
+  protected class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
+
+    private final String filenamePrefix;
+
+    public WriteOneFilePerWindow(String filenamePrefix) {
+      this.filenamePrefix = filenamePrefix;
+    }
+
+    @Override
+    public PDone expand(PCollection<String> input) {
+      // Verify that the input has a compatible window type.
+      checkArgument(
+          input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());
+
+      // filenamePrefix may contain a directory and a filename component. Pull out only the filename
+      // component from that path for the PerWindowFiles.
+      String prefix = "";
+      ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+      if (!resource.isDirectory()) {
+        prefix = verifyNotNull(
+            resource.getFilename(),
+            "A non-directory resource should have a non-null filename: %s",
+            resource);
+      }
+
+      return input.apply(
+          TextIO.write()
+              .to(resource.getCurrentDirectory())
+              .withFilenamePolicy(new PerWindowFiles(prefix))
+              .withWindowedWrites()
+              .withNumShards(3));
+    }
+  }
+
+  /**
+   * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
+   * being written. This always includes the shard number and the total number of shards. For
+   * windowed writes, it also includes the window and pane index (a sequence number assigned to each
+   * trigger firing).
+   */
+  protected static class PerWindowFiles extends FilenamePolicy {
+
+    private final String prefix;
+
+    public PerWindowFiles(String prefix) {
+      this.prefix = prefix;
+    }
+
+    public String filenamePrefixForWindow(IntervalWindow window) {
+      return String.format("%s-%s-%s",
+          prefix, formatter.print(window.start()), formatter.print(window.end()));
+    }
+
+    @Override
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext context, String extension) {
+      IntervalWindow window = (IntervalWindow) context.getWindow();
+      String filename = String.format(
+          "%s-%s-of-%s%s",
+          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+          extension);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+    }
+
+    @Override
+    public ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context context, String extension) {
+      throw new UnsupportedOperationException("Unsupported.");
+    }
+  }
+
+  @Override
+  public PDone expand(PCollection<InputT> teamAndScore) {
+    if (windowed) {
+      teamAndScore
+          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
+          .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix));
+    } else {
+      teamAndScore
+          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
+          .apply(TextIO.write().to(filenamePrefix));
+    }
+    return PDone.in(teamAndScore.getPipeline());
+  }
+}


[2/2] beam git commit: This closes #3024

Posted by al...@apache.org.
This closes #3024


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a4f7a9c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a4f7a9c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a4f7a9c3

Branch: refs/heads/master
Commit: a4f7a9c3f8997c3ff6dfe6cd6e1e42b451f8affe
Parents: c54670f 758fee8
Author: Ahmet Altay <al...@google.com>
Authored: Wed May 10 11:14:26 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed May 10 11:14:26 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/HourlyTeamScore.java |  57 ++----
 .../examples/complete/game/LeaderBoard.java     |  26 +++
 .../beam/examples/complete/game/UserScore.java  |  56 +++---
 .../complete/game/utils/WriteToText.java        | 184 +++++++++++++++++++
 4 files changed, 251 insertions(+), 72 deletions(-)
----------------------------------------------------------------------