You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/27 17:28:33 UTC
[45/50] [abbrv] incubator-beam git commit: Port some example utils
from OldDoFn to DoFn
Port some example utils from OldDoFn to DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3bec5e03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3bec5e03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3bec5e03
Branch: refs/heads/python-sdk
Commit: 3bec5e03fc21c33080bb31488a849aae0a7ce9ef
Parents: 48fdd06
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 24 13:28:06 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 26 14:59:14 2016 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 52 +++++++++++---------
.../examples/complete/game/HourlyTeamScore.java | 28 ++++++-----
.../examples/complete/game/LeaderBoard.java | 43 +++++++++-------
.../beam/examples/complete/game/UserScore.java | 14 ++++--
.../complete/game/utils/WriteToBigQuery.java | 49 ++++++++++--------
.../game/utils/WriteWindowedToBigQuery.java | 14 +++---
6 files changed, 114 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index ba52e12..5ebf892 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -193,21 +193,26 @@ public class GameStats extends LeaderBoard {
configureWindowedWrite() {
Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
- tableConfigure.put("team",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
- c -> c.element().getKey()));
- tableConfigure.put("total_score",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
- c -> c.element().getValue()));
- tableConfigure.put("window_start",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
- c -> {
- IntervalWindow w = (IntervalWindow) c.window();
- return fmt.print(w.start());
- }));
- tableConfigure.put("processing_time",
+ tableConfigure.put(
+ "team",
new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
- "STRING", c -> fmt.print(Instant.now())));
+ "STRING", (c, w) -> c.element().getKey()));
+ tableConfigure.put(
+ "total_score",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "INTEGER", (c, w) -> c.element().getValue()));
+ tableConfigure.put(
+ "window_start",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING",
+ (c, w) -> {
+ IntervalWindow window = (IntervalWindow) w;
+ return fmt.print(window.start());
+ }));
+ tableConfigure.put(
+ "processing_time",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", (c, w) -> fmt.print(Instant.now())));
return tableConfigure;
}
@@ -220,14 +225,17 @@ public class GameStats extends LeaderBoard {
Map<String, WriteWindowedToBigQuery.FieldInfo<Double>> tableConfigure =
new HashMap<String, WriteWindowedToBigQuery.FieldInfo<Double>>();
- tableConfigure.put("window_start",
- new WriteWindowedToBigQuery.FieldInfo<Double>("STRING",
- c -> {
- IntervalWindow w = (IntervalWindow) c.window();
- return fmt.print(w.start());
- }));
- tableConfigure.put("mean_duration",
- new WriteWindowedToBigQuery.FieldInfo<Double>("FLOAT", c -> c.element()));
+ tableConfigure.put(
+ "window_start",
+ new WriteWindowedToBigQuery.FieldInfo<Double>(
+ "STRING",
+ (c, w) -> {
+ IntervalWindow window = (IntervalWindow) w;
+ return fmt.print(window.start());
+ }));
+ tableConfigure.put(
+ "mean_duration",
+ new WriteWindowedToBigQuery.FieldInfo<Double>("FLOAT", (c, w) -> c.element()));
return tableConfigure;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 1f92906..aefa3bc 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -124,18 +124,22 @@ public class HourlyTeamScore extends UserScore {
configureWindowedTableWrite() {
Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfig =
new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
- tableConfig.put("team",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
- c -> c.element().getKey()));
- tableConfig.put("total_score",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
- c -> c.element().getValue()));
- tableConfig.put("window_start",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
- c -> {
- IntervalWindow w = (IntervalWindow) c.window();
- return fmt.print(w.start());
- }));
+ tableConfig.put(
+ "team",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", (c, w) -> c.element().getKey()));
+ tableConfig.put(
+ "total_score",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "INTEGER", (c, w) -> c.element().getValue()));
+ tableConfig.put(
+ "window_start",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING",
+ (c, w) -> {
+ IntervalWindow window = (IntervalWindow) w;
+ return fmt.print(window.start());
+ }));
return tableConfig;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 1eac26c..d5e3345 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -137,24 +137,30 @@ public class LeaderBoard extends HourlyTeamScore {
Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
- tableConfigure.put("team",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
- c -> c.element().getKey()));
- tableConfigure.put("total_score",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
- c -> c.element().getValue()));
- tableConfigure.put("window_start",
- new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
- c -> {
- IntervalWindow w = (IntervalWindow) c.window();
- return fmt.print(w.start());
- }));
- tableConfigure.put("processing_time",
+ tableConfigure.put(
+ "team",
new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
- "STRING", c -> fmt.print(Instant.now())));
- tableConfigure.put("timing",
+ "STRING", (c, w) -> c.element().getKey()));
+ tableConfigure.put(
+ "total_score",
new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
- "STRING", c -> c.pane().getTiming().toString()));
+ "INTEGER", (c, w) -> c.element().getValue()));
+ tableConfigure.put(
+ "window_start",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING",
+ (c, w) -> {
+ IntervalWindow window = (IntervalWindow) w;
+ return fmt.print(window.start());
+ }));
+ tableConfigure.put(
+ "processing_time",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", (c, w) -> fmt.print(Instant.now())));
+ tableConfigure.put(
+ "timing",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", (c, w) -> c.pane().getTiming().toString()));
return tableConfigure;
}
@@ -167,9 +173,10 @@ public class LeaderBoard extends HourlyTeamScore {
Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
configureBigQueryWrite();
- tableConfigure.put("processing_time",
+ tableConfigure.put(
+ "processing_time",
new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
- "STRING", c -> fmt.print(Instant.now())));
+ "STRING", (c, w) -> fmt.print(Instant.now())));
return tableConfigure;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index db89702..f70b79c 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -202,13 +202,17 @@ public class UserScore {
* is passed to the {@link WriteToBigQuery} constructor to write user score sums.
*/
protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
- configureBigQueryWrite() {
+ configureBigQueryWrite() {
Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
- tableConfigure.put("user",
- new WriteToBigQuery.FieldInfo<KV<String, Integer>>("STRING", c -> c.element().getKey()));
- tableConfigure.put("total_score",
- new WriteToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER", c -> c.element().getValue()));
+ tableConfigure.put(
+ "user",
+ new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", (c, w) -> c.element().getKey()));
+ tableConfigure.put(
+ "total_score",
+ new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+ "INTEGER", (c, w) -> c.element().getValue()));
return tableConfigure;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 40c4286..89fc271 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -32,10 +32,10 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -44,30 +44,38 @@ import org.apache.beam.sdk.values.PDone;
* the field names and types, as well as lambda functions that describe how to generate their
* values.
*/
-public class WriteToBigQuery<T>
- extends PTransform<PCollection<T>, PDone> {
+public class WriteToBigQuery<InputT>
+ extends PTransform<PCollection<InputT>, PDone> {
protected String tableName;
- protected Map<String, FieldInfo<T>> fieldInfo;
+ protected Map<String, FieldInfo<InputT>> fieldInfo;
public WriteToBigQuery() {
}
public WriteToBigQuery(String tableName,
- Map<String, FieldInfo<T>> fieldInfo) {
+ Map<String, FieldInfo<InputT>> fieldInfo) {
this.tableName = tableName;
this.fieldInfo = fieldInfo;
}
+ /**
+ * A {@link Serializable} function from a {@link DoFn.ProcessContext}
+ * and {@link BoundedWindow} to the value for that field.
+ */
+ public interface FieldFn<InputT> extends Serializable {
+ Object apply(DoFn<InputT, TableRow>.ProcessContext context, BoundedWindow window);
+ }
+
/** Define a class to hold information about output table field definitions. */
- public static class FieldInfo<T> implements Serializable {
+ public static class FieldInfo<InputT> implements Serializable {
// The BigQuery 'type' of the field
private String fieldType;
// A lambda function to generate the field value
- private SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fieldFn;
+ private FieldFn<InputT> fieldFn;
public FieldInfo(String fieldType,
- SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fieldFn) {
+ FieldFn<InputT> fieldFn) {
this.fieldType = fieldType;
this.fieldFn = fieldFn;
}
@@ -76,23 +84,22 @@ public class WriteToBigQuery<T>
return this.fieldType;
}
- SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> getFieldFn() {
+ FieldFn<InputT> getFieldFn() {
return this.fieldFn;
}
}
/** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */
- protected class BuildRowFn extends OldDoFn<T, TableRow> {
+ protected class BuildRowFn extends DoFn<InputT, TableRow> {
- @Override
- public void processElement(ProcessContext c) {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
TableRow row = new TableRow();
- for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+ for (Map.Entry<String, FieldInfo<InputT>> entry : fieldInfo.entrySet()) {
String key = entry.getKey();
- FieldInfo<T> fcnInfo = entry.getValue();
- SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fcn =
- fcnInfo.getFieldFn();
- row.set(key, fcn.apply(c));
+ FieldInfo<InputT> fcnInfo = entry.getValue();
+ FieldFn<InputT> fcn = fcnInfo.getFieldFn();
+ row.set(key, fcn.apply(c, window));
}
c.output(row);
}
@@ -101,9 +108,9 @@ public class WriteToBigQuery<T>
/** Build the output table schema. */
protected TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
- for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+ for (Map.Entry<String, FieldInfo<InputT>> entry : fieldInfo.entrySet()) {
String key = entry.getKey();
- FieldInfo<T> fcnInfo = entry.getValue();
+ FieldInfo<InputT> fcnInfo = entry.getValue();
String bqType = fcnInfo.getFieldType();
fields.add(new TableFieldSchema().setName(key).setType(bqType));
}
@@ -111,7 +118,7 @@ public class WriteToBigQuery<T>
}
@Override
- public PDone apply(PCollection<T> teamAndScore) {
+ public PDone apply(PCollection<InputT> teamAndScore) {
return teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(BigQueryIO.Write
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bec5e03/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 09f3b6c..4f2e719 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -22,10 +22,10 @@ import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -43,19 +43,17 @@ public class WriteWindowedToBigQuery<T>
}
/** Convert each key/score pair into a BigQuery TableRow. */
- protected class BuildRowFn extends OldDoFn<T, TableRow>
+ protected class BuildRowFn extends DoFn<T, TableRow>
implements RequiresWindowAccess {
- @Override
- public void processElement(ProcessContext c) {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
TableRow row = new TableRow();
for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
String key = entry.getKey();
FieldInfo<T> fcnInfo = entry.getValue();
- SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fcn =
- fcnInfo.getFieldFn();
- row.set(key, fcn.apply(c));
+ row.set(key, fcnInfo.getFieldFn().apply(c, window));
}
c.output(row);
}