You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:34 UTC
[39/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2
spaces
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
index ad5b53a..90073c1 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
@@ -40,93 +40,93 @@ import java.util.Arrays;
* Session window test
*/
public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
- protected String resultPath;
-
- public TopWikipediaSessionsITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "user: user1 value:3",
- "user: user1 value:1",
- "user: user2 value:4",
- "user: user2 value:6",
- "user: user3 value:7",
- "user: user3 value:2"
- };
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForStreaming();
-
- Long now = (System.currentTimeMillis() + 10000) / 1000;
-
- PCollection<KV<String, Long>> output =
- p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now)
- .set("contributor_username", "user3"))))
-
-
-
- .apply(ParDo.of(new DoFn<TableRow, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- TableRow row = c.element();
- long timestamp = (Integer) row.get("timestamp");
- String userName = (String) row.get("contributor_username");
- if (userName != null) {
- // Sets the timestamp field to be used in windowing.
- c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
- }
- }
- }))
-
- .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
-
- .apply(Count.<String>perElement());
-
- PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- KV<String, Long> el = c.element();
- String out = "user: " + el.getKey() + " value:" + el.getValue();
- c.output(out);
- }
- }));
-
- format.apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
+ protected String resultPath;
+
+ public TopWikipediaSessionsITCase(){
+ }
+
+ static final String[] EXPECTED_RESULT = new String[] {
+ "user: user1 value:3",
+ "user: user1 value:1",
+ "user: user2 value:4",
+ "user: user2 value:6",
+ "user: user3 value:7",
+ "user: user3 value:2"
+ };
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+
+ Pipeline p = FlinkTestPipeline.createForStreaming();
+
+ Long now = (System.currentTimeMillis() + 10000) / 1000;
+
+ PCollection<KV<String, Long>> output =
+ p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now)
+ .set("contributor_username", "user3"))))
+
+
+
+ .apply(ParDo.of(new DoFn<TableRow, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ TableRow row = c.element();
+ long timestamp = (Integer) row.get("timestamp");
+ String userName = (String) row.get("contributor_username");
+ if (userName != null) {
+ // Sets the timestamp field to be used in windowing.
+ c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+ }
+ }
+ }))
+
+ .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
+
+ .apply(Count.<String>perElement());
+
+ PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ KV<String, Long> el = c.element();
+ String out = "user: " + el.getKey() + " value:" + el.getValue();
+ c.output(out);
+ }
+ }));
+
+ format.apply(TextIO.Write.to(resultPath));
+
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
index aa5623d..b1ccee4 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
@@ -38,121 +38,121 @@ import com.google.cloud.dataflow.sdk.values.TupleTag;
*/
public class JoinExamples {
- // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
- private static final String GDELT_EVENTS_TABLE =
- "clouddataflow-readonly:samples.gdelt_sample";
- // A table that maps country codes to country names.
- private static final String COUNTRY_CODES =
- "gdelt-bq:full.crosswalk_geocountrycodetohuman";
-
- /**
- * Join two collections, using country code as the key.
- */
- public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
- PCollection<TableRow> countryCodes) throws Exception {
-
- final TupleTag<String> eventInfoTag = new TupleTag<>();
- final TupleTag<String> countryInfoTag = new TupleTag<>();
-
- // transform both input collections to tuple collections, where the keys are country
- // codes in both cases.
- PCollection<KV<String, String>> eventInfo = eventsTable.apply(
- ParDo.of(new ExtractEventDataFn()));
- PCollection<KV<String, String>> countryInfo = countryCodes.apply(
- ParDo.of(new ExtractCountryInfoFn()));
-
- // country code 'key' -> CGBKR (<event info>, <country name>)
- PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
- .of(eventInfoTag, eventInfo)
- .and(countryInfoTag, countryInfo)
- .apply(CoGroupByKey.<String>create());
-
- // Process the CoGbkResult elements generated by the CoGroupByKey transform.
- // country code 'key' -> string of <event info>, <country name>
- PCollection<KV<String, String>> finalResultCollection =
- kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<String, CoGbkResult> e = c.element();
- CoGbkResult val = e.getValue();
- String countryCode = e.getKey();
- String countryName;
- countryName = e.getValue().getOnly(countryInfoTag, "Kostas");
- for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
- // Generate a string that combines information from both collection values
- c.output(KV.of(countryCode, "Country name: " + countryName
- + ", Event info: " + eventInfo));
- }
- }
- }));
-
- // write to GCS
- return finalResultCollection
- .apply(ParDo.of(new DoFn<KV<String, String>, String>() {
- @Override
- public void processElement(ProcessContext c) {
- String outputstring = "Country code: " + c.element().getKey()
- + ", " + c.element().getValue();
- c.output(outputstring);
- }
- }));
- }
-
- /**
- * Examines each row (event) in the input table. Output a KV with the key the country
- * code of the event, and the value a string encoding event information.
- */
- static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
- @Override
- public void processElement(ProcessContext c) {
- TableRow row = c.element();
- String countryCode = (String) row.get("ActionGeo_CountryCode");
- String sqlDate = (String) row.get("SQLDATE");
- String actor1Name = (String) row.get("Actor1Name");
- String sourceUrl = (String) row.get("SOURCEURL");
- String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
- c.output(KV.of(countryCode, eventInfo));
- }
- }
-
-
- /**
- * Examines each row (country info) in the input table. Output a KV with the key the country
- * code, and the value the country name.
- */
- static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
- @Override
- public void processElement(ProcessContext c) {
- TableRow row = c.element();
- String countryCode = (String) row.get("FIPSCC");
- String countryName = (String) row.get("HumanName");
- c.output(KV.of(countryCode, countryName));
- }
- }
-
-
- /**
- * Options supported by {@link JoinExamples}.
- * <p>
- * Inherits standard configuration options.
- */
- private interface Options extends PipelineOptions {
- @Description("Path of the file to write to")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- public static void main(String[] args) throws Exception {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- Pipeline p = Pipeline.create(options);
- // the following two 'applys' create multiple inputs to our pipeline, one for each
- // of our two input sources.
- PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
- PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
- PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
- formattedResults.apply(TextIO.Write.to(options.getOutput()));
- p.run();
- }
+ // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
+ private static final String GDELT_EVENTS_TABLE =
+ "clouddataflow-readonly:samples.gdelt_sample";
+ // A table that maps country codes to country names.
+ private static final String COUNTRY_CODES =
+ "gdelt-bq:full.crosswalk_geocountrycodetohuman";
+
+ /**
+ * Join two collections, using country code as the key.
+ */
+ public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
+ PCollection<TableRow> countryCodes) throws Exception {
+
+ final TupleTag<String> eventInfoTag = new TupleTag<>();
+ final TupleTag<String> countryInfoTag = new TupleTag<>();
+
+ // transform both input collections to tuple collections, where the keys are country
+ // codes in both cases.
+ PCollection<KV<String, String>> eventInfo = eventsTable.apply(
+ ParDo.of(new ExtractEventDataFn()));
+ PCollection<KV<String, String>> countryInfo = countryCodes.apply(
+ ParDo.of(new ExtractCountryInfoFn()));
+
+ // country code 'key' -> CGBKR (<event info>, <country name>)
+ PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+ .of(eventInfoTag, eventInfo)
+ .and(countryInfoTag, countryInfo)
+ .apply(CoGroupByKey.<String>create());
+
+ // Process the CoGbkResult elements generated by the CoGroupByKey transform.
+ // country code 'key' -> string of <event info>, <country name>
+ PCollection<KV<String, String>> finalResultCollection =
+ kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<String, CoGbkResult> e = c.element();
+ CoGbkResult val = e.getValue();
+ String countryCode = e.getKey();
+ String countryName;
+ countryName = e.getValue().getOnly(countryInfoTag, "Kostas");
+ for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
+ // Generate a string that combines information from both collection values
+ c.output(KV.of(countryCode, "Country name: " + countryName
+ + ", Event info: " + eventInfo));
+ }
+ }
+ }));
+
+ // write to GCS
+ return finalResultCollection
+ .apply(ParDo.of(new DoFn<KV<String, String>, String>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ String outputstring = "Country code: " + c.element().getKey()
+ + ", " + c.element().getValue();
+ c.output(outputstring);
+ }
+ }));
+ }
+
+ /**
+ * Examines each row (event) in the input table. Output a KV with the key the country
+ * code of the event, and the value a string encoding event information.
+ */
+ static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
+ String countryCode = (String) row.get("ActionGeo_CountryCode");
+ String sqlDate = (String) row.get("SQLDATE");
+ String actor1Name = (String) row.get("Actor1Name");
+ String sourceUrl = (String) row.get("SOURCEURL");
+ String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
+ c.output(KV.of(countryCode, eventInfo));
+ }
+ }
+
+
+ /**
+ * Examines each row (country info) in the input table. Output a KV with the key the country
+ * code, and the value the country name.
+ */
+ static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
+ String countryCode = (String) row.get("FIPSCC");
+ String countryName = (String) row.get("HumanName");
+ c.output(KV.of(countryCode, countryName));
+ }
+ }
+
+
+ /**
+ * Options supported by {@link JoinExamples}.
+ * <p>
+ * Inherits standard configuration options.
+ */
+ private interface Options extends PipelineOptions {
+ @Description("Path of the file to write to")
+ @Validation.Required
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline p = Pipeline.create(options);
+ // the following two 'applys' create multiple inputs to our pipeline, one for each
+ // of our two input sources.
+ PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
+ PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
+ PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
+ formattedResults.apply(TextIO.Write.to(options.getOutput()));
+ p.run();
+ }
}