You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:10 UTC
[15/50] [abbrv] incubator-beam git commit: [tests] integrate
Wikipedia session test
[tests] integrate Wikipedia session test
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/37a9b292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/37a9b292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/37a9b292
Branch: refs/heads/master
Commit: 37a9b292d7895897225f7484bc18d0d2db55f547
Parents: 3227fcc
Author: Max <ma...@posteo.de>
Authored: Tue Feb 23 08:30:34 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800
----------------------------------------------------------------------
.../flink/dataflow/TopWikipediaSessions.java | 210 ------------------
.../dataflow/TopWikipediaSessionsITCase.java | 215 +++++++++++++------
2 files changed, 144 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/37a9b292/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
deleted file mode 100644
index ab5565a..0000000
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessions.java
+++ /dev/null
@@ -1,210 +0,0 @@
-///*
-// * Copyright (C) 2015 Google Inc.
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// * use this file except in compliance with the License. You may obtain a copy of
-// * the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// * License for the specific language governing permissions and limitations under
-// * the License.
-// */
-//
-//package com.dataartisans.flink.dataflow;
-//
-//import com.google.api.services.bigquery.model.TableRow;
-//import com.google.cloud.dataflow.sdk.Pipeline;
-//import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
-//import com.google.cloud.dataflow.sdk.io.TextIO;
-//import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-//import com.google.cloud.dataflow.sdk.options.Default;
-//import com.google.cloud.dataflow.sdk.options.Description;
-//import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-//import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-//import com.google.cloud.dataflow.sdk.options.Validation;
-//import com.google.cloud.dataflow.sdk.transforms.Count;
-//import com.google.cloud.dataflow.sdk.transforms.DoFn;
-//import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-//import com.google.cloud.dataflow.sdk.transforms.PTransform;
-//import com.google.cloud.dataflow.sdk.transforms.ParDo;
-//import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
-//import com.google.cloud.dataflow.sdk.transforms.Top;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-//import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-//import com.google.cloud.dataflow.sdk.values.KV;
-//import com.google.cloud.dataflow.sdk.values.PCollection;
-//
-//import org.joda.time.Duration;
-//import org.joda.time.Instant;
-//
-//import java.util.List;
-//
-///**
-// * Copied from {@link com.google.cloud.dataflow.examples.complete.TopWikipediaSessions} because the code
-// * is private there.
-// */
-//public class TopWikipediaSessions {
-// private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json";
-//
-// /**
-// * Extracts user and timestamp from a TableRow representing a Wikipedia edit.
-// */
-// static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
-// private static final long serialVersionUID = 0;
-//
-// @Override
-// public void processElement(ProcessContext c) {
-// TableRow row = c.element();
-// int timestamp = (Integer) row.get("timestamp");
-// String userName = (String) row.get("contributor_username");
-// if (userName != null) {
-// // Sets the implicit timestamp field to be used in windowing.
-// c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
-// }
-// }
-// }
-//
-// /**
-// * Computes the number of edits in each user session. A session is defined as
-// * a string of edits where each is separated from the next by less than an hour.
-// */
-// static class ComputeSessions
-// extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
-// private static final long serialVersionUID = 0;
-//
-// @Override
-// public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
-// return actions
-// .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
-//
-// .apply(Count.<String>perElement());
-// }
-// }
-//
-// /**
-// * Computes the longest session ending in each month.
-// */
-// private static class TopPerMonth
-// extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
-// private static final long serialVersionUID = 0;
-//
-// @Override
-// public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
-// return sessions
-// .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
-//
-// .apply(Top.of(1, new SerializableComparator<KV<String, Long>>() {
-// private static final long serialVersionUID = 0;
-//
-// @Override
-// public int compare(KV<String, Long> o1, KV<String, Long> o2) {
-// return Long.compare(o1.getValue(), o2.getValue());
-// }
-// }).withoutDefaults());
-// }
-// }
-//
-// static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>>
-// implements RequiresWindowAccess {
-//
-// private static final long serialVersionUID = 0;
-//
-// @Override
-// public void processElement(ProcessContext c) {
-// c.output(KV.of(
-// c.element().getKey() + " : " + c.window(), c.element().getValue()));
-// }
-// }
-//
-// static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String>
-// implements RequiresWindowAccess {
-// private static final long serialVersionUID = 0;
-//
-// @Override
-// public void processElement(ProcessContext c) {
-// for (KV<String, Long> item : c.element()) {
-// String session = item.getKey();
-// long count = item.getValue();
-// c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start());
-// }
-// }
-// }
-//
-// static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> {
-//
-// private static final long serialVersionUID = 0;
-//
-// private final double samplingThreshold;
-//
-// public ComputeTopSessions(double samplingThreshold) {
-// this.samplingThreshold = samplingThreshold;
-// }
-//
-// @Override
-// public PCollection<String> apply(PCollection<TableRow> input) {
-// return input
-// .apply(ParDo.of(new ExtractUserAndTimestamp()))
-//
-// .apply(ParDo.named("SampleUsers").of(
-// new DoFn<String, String>() {
-// private static final long serialVersionUID = 0;
-//
-// @Override
-// public void processElement(ProcessContext c) {
-// if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
-// c.output(c.element());
-// }
-// }
-// }))
-//
-// .apply(new ComputeSessions())
-//
-// .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
-// .apply(new TopPerMonth())
-// .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
-// }
-// }
-//
-// /**
-// * Options supported by this class.
-// *
-// * <p> Inherits standard Dataflow configuration options.
-// */
-// private static interface Options extends PipelineOptions {
-// @Description(
-// "Input specified as a GCS path containing a BigQuery table exported as json")
-// @Default.String(EXPORTED_WIKI_TABLE)
-// String getInput();
-// void setInput(String value);
-//
-// @Description("File to output results to")
-// @Validation.Required
-// String getOutput();
-// void setOutput(String value);
-// }
-//
-// public static void main(String[] args) {
-// Options options = PipelineOptionsFactory.fromArgs(args)
-// .withValidation()
-// .as(Options.class);
-// DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-//
-// Pipeline p = Pipeline.create(dataflowOptions);
-//
-// double samplingThreshold = 0.1;
-//
-// p.apply(TextIO.Read
-// .from(options.getInput())
-// .withCoder(TableRowJsonCoder.of()))
-// .apply(new ComputeTopSessions(samplingThreshold))
-// .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
-//
-// p.run();
-// }
-//}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/37a9b292/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
index 9c8147b..eb020c5 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
@@ -1,71 +1,144 @@
-///*
-// * Copyright 2015 Data Artisans GmbH
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package com.dataartisans.flink.dataflow;
-//
-//import com.google.api.services.bigquery.model.TableRow;
-//import com.google.cloud.dataflow.sdk.Pipeline;
-//import com.google.cloud.dataflow.sdk.io.TextIO;
-//import com.google.cloud.dataflow.sdk.transforms.Create;
-//import com.google.cloud.dataflow.sdk.values.PCollection;
-//import com.google.common.base.Joiner;
-//import org.apache.flink.test.util.JavaProgramTestBase;
-//
-//import java.util.Arrays;
-//
-//public class TopWikipediaSessionsITCase extends JavaProgramTestBase {
-// protected String resultPath;
-//
-// public TopWikipediaSessionsITCase(){
-// }
-//
-// static final String[] EXPECTED_RESULT = new String[] {
-// "user1 : [1970-01-01T00:00:00.000Z..1970-01-01T01:00:02.000Z)"
-// + " : 3 : 1970-01-01T00:00:00.000Z",
-// "user3 : [1970-02-05T00:00:00.000Z..1970-02-05T01:00:00.000Z)"
-// + " : 1 : 1970-02-01T00:00:00.000Z" };
-//
-// @Override
-// protected void preSubmit() throws Exception {
-// resultPath = getTempDirPath("result");
-// }
-//
-// @Override
-// protected void postSubmit() throws Exception {
-// compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-// }
-//
-// @Override
-// protected void testProgram() throws Exception {
-//
-// Pipeline p = FlinkTestPipeline.create();
-//
-// PCollection<String> output =
-// p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", 0).set
-// ("contributor_username", "user1"), new TableRow().set("timestamp", 1).set
-// ("contributor_username", "user1"), new TableRow().set("timestamp", 2).set
-// ("contributor_username", "user1"), new TableRow().set("timestamp", 0).set
-// ("contributor_username", "user2"), new TableRow().set("timestamp", 1).set
-// ("contributor_username", "user2"), new TableRow().set("timestamp", 3601).set
-// ("contributor_username", "user2"), new TableRow().set("timestamp", 3602).set
-// ("contributor_username", "user2"), new TableRow().set("timestamp", 35 * 24 * 3600)
-// .set("contributor_username", "user3"))))
-// .apply(new TopWikipediaSessions.ComputeTopSessions(1.0));
-//
-// output.apply(TextIO.Write.to(resultPath));
-//
-// p.run();
-// }
-//}
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+/**
+ * Session window test
+ */
+public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
+ protected String resultPath;
+
+ public TopWikipediaSessionsITCase(){
+ }
+
+ static final String[] EXPECTED_RESULT = new String[] {
+ "user: user1 value:3",
+ "user: user1 value:1",
+ "user: user2 value:4",
+ "user: user2 value:6",
+ "user: user3 value:7",
+ "user: user3 value:2"
+ };
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+
+ Pipeline p = FlinkTestPipeline.createStreaming();
+
+ long now = System.currentTimeMillis() + 10000;
+ System.out.println((now + 5000) / 1000);
+
+ PCollection<KV<String, Long>> output =
+ p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now)
+ .set("contributor_username", "user3"))))
+
+
+
+ .apply(ParDo.of(new DoFn<TableRow, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ TableRow row = c.element();
+ long timestamp = (Long) row.get("timestamp");
+ String userName = (String) row.get("contributor_username");
+ if (userName != null) {
+ // Sets the timestamp field to be used in windowing.
+ c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+ }
+ }
+ }))
+
+ .apply(ParDo.named("SampleUsers").of(
+ new DoFn<String, String>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) {
+ c.output(c.element());
+ }
+ }
+ }))
+
+ .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
+ .apply(Count.<String>perElement());
+
+ PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ KV<String, Long> el = c.element();
+ String out = "user: " + el.getKey() + " value:" + el.getValue();
+ System.out.println(out);
+ c.output(out);
+ }
+ }));
+
+ format.apply(TextIO.Write.to(resultPath));
+
+ p.run();
+ }
+}