You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/26 22:08:04 UTC

[2/2] incubator-beam git commit: Port some example utils from OldDoFn to DoFn

Port some example utils from OldDoFn to DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3bec5e03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3bec5e03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3bec5e03

Branch: refs/heads/master
Commit: 3bec5e03fc21c33080bb31488a849aae0a7ce9ef
Parents: 48fdd06
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 24 13:28:06 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 26 14:59:14 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  | 52 +++++++++++---------
 .../examples/complete/game/HourlyTeamScore.java | 28 ++++++-----
 .../examples/complete/game/LeaderBoard.java     | 43 +++++++++-------
 .../beam/examples/complete/game/UserScore.java  | 14 ++++--
 .../complete/game/utils/WriteToBigQuery.java    | 49 ++++++++++--------
 .../game/utils/WriteWindowedToBigQuery.java     | 14 +++---
 6 files changed, 114 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index ba52e12..5ebf892 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -193,21 +193,26 @@ public class GameStats extends LeaderBoard {
       configureWindowedWrite() {
     Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
         new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
-    tableConfigure.put("team",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-            c -> c.element().getKey()));
-    tableConfigure.put("total_score",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
-            c -> c.element().getValue()));
-    tableConfigure.put("window_start",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-          c -> {
-            IntervalWindow w = (IntervalWindow) c.window();
-            return fmt.print(w.start());
-          }));
-    tableConfigure.put("processing_time",
+    tableConfigure.put(
+        "team",
         new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", c -> fmt.print(Instant.now())));
+            "STRING", (c, w) -> c.element().getKey()));
+    tableConfigure.put(
+        "total_score",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "INTEGER", (c, w) -> c.element().getValue()));
+    tableConfigure.put(
+        "window_start",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING",
+            (c, w) -> {
+              IntervalWindow window = (IntervalWindow) w;
+              return fmt.print(window.start());
+            }));
+    tableConfigure.put(
+        "processing_time",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", (c, w) -> fmt.print(Instant.now())));
     return tableConfigure;
   }
 
@@ -220,14 +225,17 @@ public class GameStats extends LeaderBoard {
 
     Map<String, WriteWindowedToBigQuery.FieldInfo<Double>> tableConfigure =
         new HashMap<String, WriteWindowedToBigQuery.FieldInfo<Double>>();
-    tableConfigure.put("window_start",
-        new WriteWindowedToBigQuery.FieldInfo<Double>("STRING",
-          c -> {
-            IntervalWindow w = (IntervalWindow) c.window();
-            return fmt.print(w.start());
-          }));
-    tableConfigure.put("mean_duration",
-        new WriteWindowedToBigQuery.FieldInfo<Double>("FLOAT", c -> c.element()));
+    tableConfigure.put(
+        "window_start",
+        new WriteWindowedToBigQuery.FieldInfo<Double>(
+            "STRING",
+            (c, w) -> {
+              IntervalWindow window = (IntervalWindow) w;
+              return fmt.print(window.start());
+            }));
+    tableConfigure.put(
+        "mean_duration",
+        new WriteWindowedToBigQuery.FieldInfo<Double>("FLOAT", (c, w) -> c.element()));
     return tableConfigure;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 1f92906..aefa3bc 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -124,18 +124,22 @@ public class HourlyTeamScore extends UserScore {
       configureWindowedTableWrite() {
     Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfig =
         new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
-    tableConfig.put("team",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-            c -> c.element().getKey()));
-    tableConfig.put("total_score",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
-            c -> c.element().getValue()));
-    tableConfig.put("window_start",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-          c -> {
-            IntervalWindow w = (IntervalWindow) c.window();
-            return fmt.print(w.start());
-          }));
+    tableConfig.put(
+        "team",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", (c, w) -> c.element().getKey()));
+    tableConfig.put(
+        "total_score",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "INTEGER", (c, w) -> c.element().getValue()));
+    tableConfig.put(
+        "window_start",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING",
+            (c, w) -> {
+              IntervalWindow window = (IntervalWindow) w;
+              return fmt.print(window.start());
+            }));
     return tableConfig;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 1eac26c..d5e3345 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -137,24 +137,30 @@ public class LeaderBoard extends HourlyTeamScore {
 
     Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
         new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
-    tableConfigure.put("team",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-            c -> c.element().getKey()));
-    tableConfigure.put("total_score",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
-            c -> c.element().getValue()));
-    tableConfigure.put("window_start",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-          c -> {
-            IntervalWindow w = (IntervalWindow) c.window();
-            return fmt.print(w.start());
-          }));
-    tableConfigure.put("processing_time",
+    tableConfigure.put(
+        "team",
         new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", c -> fmt.print(Instant.now())));
-    tableConfigure.put("timing",
+            "STRING", (c, w) -> c.element().getKey()));
+    tableConfigure.put(
+        "total_score",
         new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", c -> c.pane().getTiming().toString()));
+            "INTEGER", (c, w) -> c.element().getValue()));
+    tableConfigure.put(
+        "window_start",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING",
+            (c, w) -> {
+              IntervalWindow window = (IntervalWindow) w;
+              return fmt.print(window.start());
+            }));
+    tableConfigure.put(
+        "processing_time",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", (c, w) -> fmt.print(Instant.now())));
+    tableConfigure.put(
+        "timing",
+        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", (c, w) -> c.pane().getTiming().toString()));
     return tableConfigure;
   }
 
@@ -167,9 +173,10 @@ public class LeaderBoard extends HourlyTeamScore {
 
     Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
         configureBigQueryWrite();
-    tableConfigure.put("processing_time",
+    tableConfigure.put(
+        "processing_time",
         new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", c -> fmt.print(Instant.now())));
+            "STRING", (c, w) -> fmt.print(Instant.now())));
     return tableConfigure;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index db89702..f70b79c 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -202,13 +202,17 @@ public class UserScore {
    * is passed to the {@link WriteToBigQuery} constructor to write user score sums.
    */
   protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
-    configureBigQueryWrite() {
+      configureBigQueryWrite() {
     Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
         new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
-    tableConfigure.put("user",
-        new WriteToBigQuery.FieldInfo<KV<String, Integer>>("STRING", c -> c.element().getKey()));
-    tableConfigure.put("total_score",
-        new WriteToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER", c -> c.element().getValue()));
+    tableConfigure.put(
+        "user",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+            "STRING", (c, w) -> c.element().getKey()));
+    tableConfigure.put(
+        "total_score",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+            "INTEGER", (c, w) -> c.element().getValue()));
     return tableConfigure;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 40c4286..89fc271 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -32,10 +32,10 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 
@@ -44,30 +44,38 @@ import org.apache.beam.sdk.values.PDone;
  * the field names and types, as well as lambda functions that describe how to generate their
  * values.
  */
-public class WriteToBigQuery<T>
-    extends PTransform<PCollection<T>, PDone> {
+public class WriteToBigQuery<InputT>
+    extends PTransform<PCollection<InputT>, PDone> {
 
   protected String tableName;
-  protected Map<String, FieldInfo<T>> fieldInfo;
+  protected Map<String, FieldInfo<InputT>> fieldInfo;
 
   public WriteToBigQuery() {
   }
 
   public WriteToBigQuery(String tableName,
-      Map<String, FieldInfo<T>> fieldInfo) {
+      Map<String, FieldInfo<InputT>> fieldInfo) {
     this.tableName = tableName;
     this.fieldInfo = fieldInfo;
   }
 
+  /**
+   * A {@link Serializable} function from a {@link DoFn.ProcessContext}
+   * and {@link BoundedWindow} to the value for that field.
+   */
+  public interface FieldFn<InputT> extends Serializable {
+    Object apply(DoFn<InputT, TableRow>.ProcessContext context, BoundedWindow window);
+  }
+
   /** Define a class to hold information about output table field definitions. */
-  public static class FieldInfo<T> implements Serializable {
+  public static class FieldInfo<InputT> implements Serializable {
     // The BigQuery 'type' of the field
     private String fieldType;
     // A lambda function to generate the field value
-    private SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fieldFn;
+    private FieldFn<InputT> fieldFn;
 
     public FieldInfo(String fieldType,
-        SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fieldFn) {
+        FieldFn<InputT> fieldFn) {
       this.fieldType = fieldType;
       this.fieldFn = fieldFn;
     }
@@ -76,23 +84,22 @@ public class WriteToBigQuery<T>
       return this.fieldType;
     }
 
-    SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> getFieldFn() {
+    FieldFn<InputT> getFieldFn() {
       return this.fieldFn;
     }
   }
   /** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */
-  protected class BuildRowFn extends OldDoFn<T, TableRow> {
+  protected class BuildRowFn extends DoFn<InputT, TableRow> {
 
-    @Override
-    public void processElement(ProcessContext c) {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
 
       TableRow row = new TableRow();
-      for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+      for (Map.Entry<String, FieldInfo<InputT>> entry : fieldInfo.entrySet()) {
           String key = entry.getKey();
-          FieldInfo<T> fcnInfo = entry.getValue();
-          SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fcn =
-            fcnInfo.getFieldFn();
-          row.set(key, fcn.apply(c));
+          FieldInfo<InputT> fcnInfo = entry.getValue();
+          FieldFn<InputT> fcn = fcnInfo.getFieldFn();
+          row.set(key, fcn.apply(c, window));
         }
       c.output(row);
     }
@@ -101,9 +108,9 @@ public class WriteToBigQuery<T>
   /** Build the output table schema. */
   protected TableSchema getSchema() {
     List<TableFieldSchema> fields = new ArrayList<>();
-    for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+    for (Map.Entry<String, FieldInfo<InputT>> entry : fieldInfo.entrySet()) {
       String key = entry.getKey();
-      FieldInfo<T> fcnInfo = entry.getValue();
+      FieldInfo<InputT> fcnInfo = entry.getValue();
       String bqType = fcnInfo.getFieldType();
       fields.add(new TableFieldSchema().setName(key).setType(bqType));
     }
@@ -111,7 +118,7 @@ public class WriteToBigQuery<T>
   }
 
   @Override
-  public PDone apply(PCollection<T> teamAndScore) {
+  public PDone apply(PCollection<InputT> teamAndScore) {
     return teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.Write

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 09f3b6c..4f2e719 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -22,10 +22,10 @@ import java.util.Map;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 
@@ -43,19 +43,17 @@ public class WriteWindowedToBigQuery<T>
   }
 
   /** Convert each key/score pair into a BigQuery TableRow. */
-  protected class BuildRowFn extends OldDoFn<T, TableRow>
+  protected class BuildRowFn extends DoFn<T, TableRow>
       implements RequiresWindowAccess {
 
-    @Override
-    public void processElement(ProcessContext c) {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
 
       TableRow row = new TableRow();
       for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
           String key = entry.getKey();
           FieldInfo<T> fcnInfo = entry.getValue();
-          SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fcn =
-            fcnInfo.getFieldFn();
-          row.set(key, fcn.apply(c));
+          row.set(key, fcnInfo.getFieldFn().apply(c, window));
         }
       c.output(row);
     }