You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:34 UTC

[39/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2 spaces

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
index ad5b53a..90073c1 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
@@ -40,93 +40,93 @@ import java.util.Arrays;
  * Session window test
  */
 public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
-	protected String resultPath;
-
-	public TopWikipediaSessionsITCase(){
-	}
-
-	static final String[] EXPECTED_RESULT = new String[] {
-			"user: user1 value:3",
-			"user: user1 value:1",
-			"user: user2 value:4",
-			"user: user2 value:6",
-			"user: user3 value:7",
-			"user: user3 value:2"
-	};
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-
-		Pipeline p = FlinkTestPipeline.createForStreaming();
-
-		Long now = (System.currentTimeMillis() + 10000) / 1000;
-
-		PCollection<KV<String, Long>> output =
-			p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
-					("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now).set
-					("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
-					("contributor_username", "user1"), new TableRow().set("timestamp", now).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
-					("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now)
-					.set("contributor_username", "user3"))))
-
-
-
-			.apply(ParDo.of(new DoFn<TableRow, String>() {
-				@Override
-				public void processElement(ProcessContext c) throws Exception {
-					TableRow row = c.element();
-					long timestamp = (Integer) row.get("timestamp");
-					String userName = (String) row.get("contributor_username");
-					if (userName != null) {
-						// Sets the timestamp field to be used in windowing.
-						c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
-					}
-				}
-			}))
-
-			.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
-
-			.apply(Count.<String>perElement());
-
-		PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
-			@Override
-			public void processElement(ProcessContext c) throws Exception {
-				KV<String, Long> el = c.element();
-				String out = "user: " + el.getKey() + " value:" + el.getValue();
-				c.output(out);
-			}
-		}));
-
-		format.apply(TextIO.Write.to(resultPath));
-
-		p.run();
-	}
+  protected String resultPath;
+
+  public TopWikipediaSessionsITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "user: user1 value:3",
+      "user: user1 value:1",
+      "user: user2 value:4",
+      "user: user2 value:6",
+      "user: user3 value:7",
+      "user: user3 value:2"
+  };
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForStreaming();
+
+    Long now = (System.currentTimeMillis() + 10000) / 1000;
+
+    PCollection<KV<String, Long>> output =
+      p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
+          ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now).set
+          ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
+          ("contributor_username", "user1"), new TableRow().set("timestamp", now).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
+          ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now)
+          .set("contributor_username", "user3"))))
+
+
+
+      .apply(ParDo.of(new DoFn<TableRow, String>() {
+        @Override
+        public void processElement(ProcessContext c) throws Exception {
+          TableRow row = c.element();
+          long timestamp = (Integer) row.get("timestamp");
+          String userName = (String) row.get("contributor_username");
+          if (userName != null) {
+            // Sets the timestamp field to be used in windowing.
+            c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+          }
+        }
+      }))
+
+      .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
+
+      .apply(Count.<String>perElement());
+
+    PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        KV<String, Long> el = c.element();
+        String out = "user: " + el.getKey() + " value:" + el.getValue();
+        c.output(out);
+      }
+    }));
+
+    format.apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
index aa5623d..b1ccee4 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
@@ -38,121 +38,121 @@ import com.google.cloud.dataflow.sdk.values.TupleTag;
  */
 public class JoinExamples {
 
-	// A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
-	private static final String GDELT_EVENTS_TABLE =
-			"clouddataflow-readonly:samples.gdelt_sample";
-	// A table that maps country codes to country names.
-	private static final String COUNTRY_CODES =
-			"gdelt-bq:full.crosswalk_geocountrycodetohuman";
-
-	/**
-	 * Join two collections, using country code as the key.
-	 */
-	public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
-	                                      PCollection<TableRow> countryCodes) throws Exception {
-
-		final TupleTag<String> eventInfoTag = new TupleTag<>();
-		final TupleTag<String> countryInfoTag = new TupleTag<>();
-
-		// transform both input collections to tuple collections, where the keys are country
-		// codes in both cases.
-		PCollection<KV<String, String>> eventInfo = eventsTable.apply(
-				ParDo.of(new ExtractEventDataFn()));
-		PCollection<KV<String, String>> countryInfo = countryCodes.apply(
-				ParDo.of(new ExtractCountryInfoFn()));
-
-		// country code 'key' -> CGBKR (<event info>, <country name>)
-		PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
-				.of(eventInfoTag, eventInfo)
-				.and(countryInfoTag, countryInfo)
-				.apply(CoGroupByKey.<String>create());
-
-		// Process the CoGbkResult elements generated by the CoGroupByKey transform.
-		// country code 'key' -> string of <event info>, <country name>
-		PCollection<KV<String, String>> finalResultCollection =
-				kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
-					@Override
-					public void processElement(ProcessContext c) {
-						KV<String, CoGbkResult> e = c.element();
-						CoGbkResult val = e.getValue();
-						String countryCode = e.getKey();
-						String countryName;
-						countryName = e.getValue().getOnly(countryInfoTag, "Kostas");
-						for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
-							// Generate a string that combines information from both collection values
-							c.output(KV.of(countryCode, "Country name: " + countryName
-									+ ", Event info: " + eventInfo));
-						}
-					}
-				}));
-
-		// write to GCS
-		return finalResultCollection
-				.apply(ParDo.of(new DoFn<KV<String, String>, String>() {
-					@Override
-					public void processElement(ProcessContext c) {
-						String outputstring = "Country code: " + c.element().getKey()
-								+ ", " + c.element().getValue();
-						c.output(outputstring);
-					}
-				}));
-	}
-
-	/**
-	 * Examines each row (event) in the input table. Output a KV with the key the country
-	 * code of the event, and the value a string encoding event information.
-	 */
-	static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
-		@Override
-		public void processElement(ProcessContext c) {
-			TableRow row = c.element();
-			String countryCode = (String) row.get("ActionGeo_CountryCode");
-			String sqlDate = (String) row.get("SQLDATE");
-			String actor1Name = (String) row.get("Actor1Name");
-			String sourceUrl = (String) row.get("SOURCEURL");
-			String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
-			c.output(KV.of(countryCode, eventInfo));
-		}
-	}
-
-
-	/**
-	 * Examines each row (country info) in the input table. Output a KV with the key the country
-	 * code, and the value the country name.
-	 */
-	static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
-		@Override
-		public void processElement(ProcessContext c) {
-			TableRow row = c.element();
-			String countryCode = (String) row.get("FIPSCC");
-			String countryName = (String) row.get("HumanName");
-			c.output(KV.of(countryCode, countryName));
-		}
-	}
-
-
-	/**
-	 * Options supported by {@link JoinExamples}.
-	 * <p>
-	 * Inherits standard configuration options.
-	 */
-	private interface Options extends PipelineOptions {
-		@Description("Path of the file to write to")
-		@Validation.Required
-		String getOutput();
-		void setOutput(String value);
-	}
-
-	public static void main(String[] args) throws Exception {
-		Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-		Pipeline p = Pipeline.create(options);
-		// the following two 'applys' create multiple inputs to our pipeline, one for each
-		// of our two input sources.
-		PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
-		PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
-		PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
-		formattedResults.apply(TextIO.Write.to(options.getOutput()));
-		p.run();
-	}
+  // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
+  private static final String GDELT_EVENTS_TABLE =
+      "clouddataflow-readonly:samples.gdelt_sample";
+  // A table that maps country codes to country names.
+  private static final String COUNTRY_CODES =
+      "gdelt-bq:full.crosswalk_geocountrycodetohuman";
+
+  /**
+   * Join two collections, using country code as the key.
+   */
+  public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
+                                        PCollection<TableRow> countryCodes) throws Exception {
+
+    final TupleTag<String> eventInfoTag = new TupleTag<>();
+    final TupleTag<String> countryInfoTag = new TupleTag<>();
+
+    // transform both input collections to tuple collections, where the keys are country
+    // codes in both cases.
+    PCollection<KV<String, String>> eventInfo = eventsTable.apply(
+        ParDo.of(new ExtractEventDataFn()));
+    PCollection<KV<String, String>> countryInfo = countryCodes.apply(
+        ParDo.of(new ExtractCountryInfoFn()));
+
+    // country code 'key' -> CGBKR (<event info>, <country name>)
+    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+        .of(eventInfoTag, eventInfo)
+        .and(countryInfoTag, countryInfo)
+        .apply(CoGroupByKey.<String>create());
+
+    // Process the CoGbkResult elements generated by the CoGroupByKey transform.
+    // country code 'key' -> string of <event info>, <country name>
+    PCollection<KV<String, String>> finalResultCollection =
+        kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            KV<String, CoGbkResult> e = c.element();
+            CoGbkResult val = e.getValue();
+            String countryCode = e.getKey();
+            String countryName;
+            countryName = e.getValue().getOnly(countryInfoTag, "Kostas");
+            for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
+              // Generate a string that combines information from both collection values
+              c.output(KV.of(countryCode, "Country name: " + countryName
+                  + ", Event info: " + eventInfo));
+            }
+          }
+        }));
+
+    // write to GCS
+    return finalResultCollection
+        .apply(ParDo.of(new DoFn<KV<String, String>, String>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            String outputstring = "Country code: " + c.element().getKey()
+                + ", " + c.element().getValue();
+            c.output(outputstring);
+          }
+        }));
+  }
+
+  /**
+   * Examines each row (event) in the input table. Output a KV with the key the country
+   * code of the event, and the value a string encoding event information.
+   */
+  static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      String countryCode = (String) row.get("ActionGeo_CountryCode");
+      String sqlDate = (String) row.get("SQLDATE");
+      String actor1Name = (String) row.get("Actor1Name");
+      String sourceUrl = (String) row.get("SOURCEURL");
+      String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
+      c.output(KV.of(countryCode, eventInfo));
+    }
+  }
+
+
+  /**
+   * Examines each row (country info) in the input table. Output a KV with the key the country
+   * code, and the value the country name.
+   */
+  static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      String countryCode = (String) row.get("FIPSCC");
+      String countryName = (String) row.get("HumanName");
+      c.output(KV.of(countryCode, countryName));
+    }
+  }
+
+
+  /**
+   * Options supported by {@link JoinExamples}.
+   * <p>
+   * Inherits standard configuration options.
+   */
+  private interface Options extends PipelineOptions {
+    @Description("Path of the file to write to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+    // the following two 'applys' create multiple inputs to our pipeline, one for each
+    // of our two input sources.
+    PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
+    PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
+    PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
+    formattedResults.apply(TextIO.Write.to(options.getOutput()));
+    p.run();
+  }
 
 }