You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:10 UTC

[15/50] [abbrv] incubator-beam git commit: [tests] integrate Wikipedia session test

[tests] integrate Wikipedia session test


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/37a9b292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/37a9b292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/37a9b292

Branch: refs/heads/master
Commit: 37a9b292d7895897225f7484bc18d0d2db55f547
Parents: 3227fcc
Author: Max <ma...@posteo.de>
Authored: Tue Feb 23 08:30:34 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../flink/dataflow/TopWikipediaSessions.java    | 210 ------------------
 .../dataflow/TopWikipediaSessionsITCase.java    | 215 +++++++++++++------
 2 files changed, 144 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/37a9b292/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
deleted file mode 100644
index ab5565a..0000000
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
+++ /dev/null
@@ -1,210 +0,0 @@
-///*
-// * Copyright (C) 2015 Google Inc.
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// * use this file except in compliance with the License. You may obtain a copy of
-// * the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// * License for the specific language governing permissions and limitations under
-// * the License.
-// */
-//
-//package com.dataartisans.flink.dataflow;
-//
-//import com.google.api.services.bigquery.model.TableRow;
-//import com.google.cloud.dataflow.sdk.Pipeline;
-//import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
-//import com.google.cloud.dataflow.sdk.io.TextIO;
-//import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-//import com.google.cloud.dataflow.sdk.options.Default;
-//import com.google.cloud.dataflow.sdk.options.Description;
-//import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-//import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-//import com.google.cloud.dataflow.sdk.options.Validation;
-//import com.google.cloud.dataflow.sdk.transforms.Count;
-//import com.google.cloud.dataflow.sdk.transforms.DoFn;
-//import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-//import com.google.cloud.dataflow.sdk.transforms.PTransform;
-//import com.google.cloud.dataflow.sdk.transforms.ParDo;
-//import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
-//import com.google.cloud.dataflow.sdk.transforms.Top;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-//import com.google.cloud.dataflow.sdk.values.KV;
-//import com.google.cloud.dataflow.sdk.values.PCollection;
-//
-//import org.joda.time.Duration;
-//import org.joda.time.Instant;
-//
-//import java.util.List;
-//
-///**
-// * Copied from {@link com.google.cloud.dataflow.examples.complete.TopWikipediaSessions} because the code
-// * is private there.
-// */
-//public class TopWikipediaSessions {
-//	private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json";
-//
-//	/**
-//	 * Extracts user and timestamp from a TableRow representing a Wikipedia edit.
-//	 */
-//	static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
-//		private static final long serialVersionUID = 0;
-//
-//		@Override
-//		public void processElement(ProcessContext c) {
-//			TableRow row = c.element();
-//			int timestamp = (Integer) row.get("timestamp");
-//			String userName = (String) row.get("contributor_username");
-//			if (userName != null) {
-//				// Sets the implicit timestamp field to be used in windowing.
-//				c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
-//			}
-//		}
-//	}
-//
-//	/**
-//	 * Computes the number of edits in each user session.  A session is defined as
-//	 * a string of edits where each is separated from the next by less than an hour.
-//	 */
-//	static class ComputeSessions
-//			extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
-//		private static final long serialVersionUID = 0;
-//
-//		@Override
-//		public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
-//			return actions
-//					.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
-//
-//					.apply(Count.<String>perElement());
-//		}
-//	}
-//
-//	/**
-//	 * Computes the longest session ending in each month.
-//	 */
-//	private static class TopPerMonth
-//			extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
-//		private static final long serialVersionUID = 0;
-//
-//		@Override
-//		public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
-//			return sessions
-//					.apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
-//
-//					.apply(Top.of(1, new SerializableComparator<KV<String, Long>>() {
-//						private static final long serialVersionUID = 0;
-//
-//						@Override
-//						public int compare(KV<String, Long> o1, KV<String, Long> o2) {
-//							return Long.compare(o1.getValue(), o2.getValue());
-//						}
-//					}).withoutDefaults());
-//		}
-//	}
-//
-//	static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>>
-//			implements RequiresWindowAccess {
-//
-//		private static final long serialVersionUID = 0;
-//
-//		@Override
-//		public void processElement(ProcessContext c) {
-//			c.output(KV.of(
-//					c.element().getKey() + " : " + c.window(), c.element().getValue()));
-//		}
-//	}
-//
-//	static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String>
-//			implements RequiresWindowAccess {
-//		private static final long serialVersionUID = 0;
-//
-//		@Override
-//		public void processElement(ProcessContext c) {
-//			for (KV<String, Long> item : c.element()) {
-//				String session = item.getKey();
-//				long count = item.getValue();
-//				c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start());
-//			}
-//		}
-//	}
-//
-//	static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> {
-//
-//		private static final long serialVersionUID = 0;
-//
-//		private final double samplingThreshold;
-//
-//		public ComputeTopSessions(double samplingThreshold) {
-//			this.samplingThreshold = samplingThreshold;
-//		}
-//
-//		@Override
-//		public PCollection<String> apply(PCollection<TableRow> input) {
-//			return input
-//					.apply(ParDo.of(new ExtractUserAndTimestamp()))
-//
-//					.apply(ParDo.named("SampleUsers").of(
-//							new DoFn<String, String>() {
-//								private static final long serialVersionUID = 0;
-//
-//								@Override
-//								public void processElement(ProcessContext c) {
-//									if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
-//										c.output(c.element());
-//									}
-//								}
-//							}))
-//
-//					.apply(new ComputeSessions())
-//
-//					.apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
-//					.apply(new TopPerMonth())
-//					.apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
-//		}
-//	}
-//
-//	/**
-//	 * Options supported by this class.
-//	 *
-//	 * <p> Inherits standard Dataflow configuration options.
-//	 */
-//	private static interface Options extends PipelineOptions {
-//		@Description(
-//				"Input specified as a GCS path containing a BigQuery table exported as json")
-//		@Default.String(EXPORTED_WIKI_TABLE)
-//		String getInput();
-//		void setInput(String value);
-//
-//		@Description("File to output results to")
-//		@Validation.Required
-//		String getOutput();
-//		void setOutput(String value);
-//	}
-//
-//	public static void main(String[] args) {
-//		Options options = PipelineOptionsFactory.fromArgs(args)
-//				.withValidation()
-//				.as(Options.class);
-//		DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-//
-//		Pipeline p = Pipeline.create(dataflowOptions);
-//
-//		double samplingThreshold = 0.1;
-//
-//		p.apply(TextIO.Read
-//				.from(options.getInput())
-//				.withCoder(TableRowJsonCoder.of()))
-//				.apply(new ComputeTopSessions(samplingThreshold))
-//				.apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
-//
-//		p.run();
-//	}
-//}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/37a9b292/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
index 9c8147b..eb020c5 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
@@ -1,71 +1,144 @@
-///*
-// * Copyright 2015 Data Artisans GmbH
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package com.dataartisans.flink.dataflow;
-//
-//import com.google.api.services.bigquery.model.TableRow;
-//import com.google.cloud.dataflow.sdk.Pipeline;
-//import com.google.cloud.dataflow.sdk.io.TextIO;
-//import com.google.cloud.dataflow.sdk.transforms.Create;
-//import com.google.cloud.dataflow.sdk.values.PCollection;
-//import com.google.common.base.Joiner;
-//import org.apache.flink.test.util.JavaProgramTestBase;
-//
-//import java.util.Arrays;
-//
-//public class TopWikipediaSessionsITCase extends JavaProgramTestBase {
-//	protected String resultPath;
-//
-//	public TopWikipediaSessionsITCase(){
-//	}
-//
-//	static final String[] EXPECTED_RESULT = new String[] {
-//			"user1 : [1970-01-01T00:00:00.000Z..1970-01-01T01:00:02.000Z)"
-//					+ " : 3 : 1970-01-01T00:00:00.000Z",
-//			"user3 : [1970-02-05T00:00:00.000Z..1970-02-05T01:00:00.000Z)"
-//					+ " : 1 : 1970-02-01T00:00:00.000Z" };
-//
-//	@Override
-//	protected void preSubmit() throws Exception {
-//		resultPath = getTempDirPath("result");
-//	}
-//
-//	@Override
-//	protected void postSubmit() throws Exception {
-//		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-//	}
-//
-//	@Override
-//	protected void testProgram() throws Exception {
-//
-//		Pipeline p = FlinkTestPipeline.create();
-//
-//		PCollection<String> output =
-//				p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", 0).set
-//						("contributor_username", "user1"), new TableRow().set("timestamp", 1).set
-//						("contributor_username", "user1"), new TableRow().set("timestamp", 2).set
-//						("contributor_username", "user1"), new TableRow().set("timestamp", 0).set
-//						("contributor_username", "user2"), new TableRow().set("timestamp", 1).set
-//						("contributor_username", "user2"), new TableRow().set("timestamp", 3601).set
-//						("contributor_username", "user2"), new TableRow().set("timestamp", 3602).set
-//						("contributor_username", "user2"), new TableRow().set("timestamp", 35 * 24 * 3600)
-//						.set("contributor_username", "user3"))))
-//						.apply(new TopWikipediaSessions.ComputeTopSessions(1.0));
-//
-//		output.apply(TextIO.Write.to(resultPath));
-//
-//		p.run();
-//	}
-//}
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+/**
+ * Session window test
+ */
+public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
+	protected String resultPath;
+
+	public TopWikipediaSessionsITCase(){
+	}
+
+	static final String[] EXPECTED_RESULT = new String[] {
+			"user: user1 value:3",
+			"user: user1 value:1",
+			"user: user2 value:4",
+			"user: user2 value:6",
+			"user: user3 value:7",
+			"user: user3 value:2"
+	};
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+
+		Pipeline p = FlinkTestPipeline.createStreaming();
+
+		long now = System.currentTimeMillis() + 10000;
+		System.out.println((now + 5000) / 1000);
+
+		PCollection<KV<String, Long>> output =
+			p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
+					("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now).set
+					("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
+					("contributor_username", "user1"), new TableRow().set("timestamp", now).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
+					("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now)
+					.set("contributor_username", "user3"))))
+
+
+
+			.apply(ParDo.of(new DoFn<TableRow, String>() {
+				@Override
+				public void processElement(ProcessContext c) throws Exception {
+					TableRow row = c.element();
+					long timestamp = (Long) row.get("timestamp");
+					String userName = (String) row.get("contributor_username");
+					if (userName != null) {
+						// Sets the timestamp field to be used in windowing.
+						c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+					}
+				}
+			}))
+
+			.apply(ParDo.named("SampleUsers").of(
+					new DoFn<String, String>() {
+						private static final long serialVersionUID = 0;
+
+						@Override
+						public void processElement(ProcessContext c) {
+							if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) {
+								c.output(c.element());
+							}
+						}
+					}))
+
+					.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
+					.apply(Count.<String>perElement());
+
+		PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
+			@Override
+			public void processElement(ProcessContext c) throws Exception {
+				KV<String, Long> el = c.element();
+				String out = "user: " + el.getKey() + " value:" + el.getValue();
+				System.out.println(out);
+				c.output(out);
+			}
+		}));
+
+		format.apply(TextIO.Write.to(resultPath));
+
+		p.run();
+	}
+}