You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:50 UTC

[17/28] git commit: [streaming] WindowJoin Example refactored

[streaming] WindowJoin Example refactored


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0b721125
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0b721125
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0b721125

Branch: refs/heads/master
Commit: 0b7211250b120a2d96abd10daead4c0c55eba3c4
Parents: f74cb0a
Author: eszesd <es...@gmail.com>
Authored: Fri Aug 22 17:11:28 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../examples/window/join/GradeSource.java       |  47 +++++++++
 .../examples/window/join/SalarySource.java      |  47 +++++++++
 .../examples/window/join/WindowJoinLocal.java   |  24 ++---
 .../window/join/WindowJoinSourceOne.java        |  48 ---------
 .../window/join/WindowJoinSourceTwo.java        |  48 ---------
 .../examples/window/join/WindowJoinTask.java    | 104 +++++++++++--------
 6 files changed, 163 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
new file mode 100644
index 0000000..6579728
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.window.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+public class GradeSource implements SourceFunction<Tuple3<String, Integer, Long>> {
+
+	private static final long serialVersionUID = -5897483980082089771L;
+
+	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
+			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+	private Random rand = new Random();
+	private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
+	private Long progress = 0L;
+
+	@Override
+	public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+		// Continuously emit tuples with random names and integers (grades).
+		while (true) {
+			outTuple.f0 = names[rand.nextInt(names.length)];
+			outTuple.f1 = rand.nextInt(5) + 1;
+			outTuple.f2 = progress;
+			out.collect(outTuple);
+			progress += 1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
new file mode 100644
index 0000000..cceccad
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.window.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+public class SalarySource implements SourceFunction<Tuple3<String, Integer, Long>> {
+
+	private static final long serialVersionUID = 6670933703432267728L;
+
+	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
+			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+	private Random rand = new Random();
+	private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
+	private Long progress = 0L;
+
+	@Override
+	public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+		// Continuously emit tuples with random names and integers (salaries).
+		while (true) {
+			outTuple.f0 = names[rand.nextInt(names.length)];
+			outTuple.f1 = rand.nextInt(10000);
+			outTuple.f2 = progress;
+			out.collect(outTuple);
+			progress += 1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index e9cef1e..b298f3f 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -18,11 +18,8 @@
 package org.apache.flink.streaming.examples.window.join;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
 
 public class WindowJoinLocal {
 
@@ -32,23 +29,22 @@ public class WindowJoinLocal {
 	// This example will join two streams with a sliding window. One which emits
 	// people's grades and one which emits people's salaries.
 
-	//TODO update and reconsider
 	public static void main(String[] args) {
 
-		LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
+				PARALLELISM).setBufferTimeout(100);
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM).setBufferTimeout(100);
+		DataStream<Tuple3<String, Integer, Long>> grades = env.addSource(new GradeSource(),
+				SOURCE_PARALLELISM);
 
-		DataStream<Tuple4<String, String, Integer, Long>> dataStream1 = env.addSource(
-				new WindowJoinSourceOne(), SOURCE_PARALLELISM);
+		DataStream<Tuple3<String, Integer, Long>> salaries = env.addSource(new SalarySource(),
+				SOURCE_PARALLELISM);
 
-		@SuppressWarnings("unchecked")
-		DataStream<Tuple3<String, Integer, Integer>> dataStream2 = env
-				.addSource(new WindowJoinSourceTwo(), SOURCE_PARALLELISM).merge(dataStream1)
-				.partitionBy(1).flatMap(new WindowJoinTask());
+		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
+				.flatMap(new WindowJoinTask());
 
-		dataStream2.print();
+		System.out.println("(NAME, GRADE, SALARY)");
+		joinedStream.print();
 
 		env.execute();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceOne.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceOne.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceOne.java
deleted file mode 100644
index d4651aa..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceOne.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.util.Collector;
-
-public class WindowJoinSourceOne implements SourceFunction<Tuple4<String, String, Integer, Long>> {
-
-	private static final long serialVersionUID = 6670933703432267728L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
-	private Random rand = new Random();
-	private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
-	private Long progress = 0L;
-
-	@Override
-	public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
-		// Continuously emit tuples with random names and integers (salaries).
-		while (true) {
-			outTuple.f0 = "salary";
-			outTuple.f1 = names[rand.nextInt(names.length)];
-			outTuple.f2 = rand.nextInt(10000);
-			outTuple.f3 = progress;
-			out.collect(outTuple);
-			progress += 1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceTwo.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceTwo.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceTwo.java
deleted file mode 100644
index 0df4fe7..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceTwo.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.util.Collector;
-
-public class WindowJoinSourceTwo implements SourceFunction<Tuple4<String, String, Integer, Long>> {
-
-	private static final long serialVersionUID = -5897483980082089771L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
-	private Random rand = new Random();
-	private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
-	private Long progress = 0L;
-
-	@Override
-	public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
-		// Continuously emit tuples with random names and integers (grades).
-		while (true) {
-			outTuple.f0 = "grade";
-			outTuple.f1 = names[rand.nextInt(names.length)];
-			outTuple.f2 = rand.nextInt(5) + 1;
-			outTuple.f3 = progress;
-			out.collect(outTuple);
-			progress += 1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
index 8845257..7758615 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
@@ -21,13 +21,12 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
 import org.apache.flink.util.Collector;
 
 public class WindowJoinTask extends
-		RichFlatMapFunction<Tuple4<String, String, Integer, Long>, Tuple3<String, Integer, Integer>> {
+		RichCoFlatMapFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Integer>> {
 
 	class SalaryProgress {
 		public SalaryProgress(Integer salary, Long progress) {
@@ -53,60 +52,75 @@ public class WindowJoinTask extends
 	private int windowSize = 100;
 	private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
 	private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
-
+	private String name;
+	private Long progress;
+	
 	public WindowJoinTask() {
 		gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
 		salaryHashmap = new HashMap<String, LinkedList<SalaryProgress>>();
+		name = new String();
+		progress = 0L;
 	}
 
+	Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
+	
+	// Joins the input value (grade) with the already known values (salaries) on
+	// a given interval.
+	// Also stores the new element.
 	@Override
-	public void flatMap(Tuple4<String, String, Integer, Long> value,
+	public void flatMap1(Tuple3<String, Integer, Long> value,
 			Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
-		String streamId = value.f0;
-		String name = value.f1;
-		Long progress = value.f3;
+		name = value.f0;
+		progress = value.f2;
+
+		outputTuple.f0 = name;
+		outputTuple.f1 = value.f1;
 		
-		// Joins the input value with the already known values on a given interval. If it is a grade
-		// then with the salaries, if it is a salary then with the grades. Also
-		// stores the new element.
-		if (streamId.equals("grade")) {
-			if (salaryHashmap.containsKey(name)) {
-				Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
-				while (iterator.hasNext()) {
-					SalaryProgress entry = iterator.next();
-					if (progress - entry.progress > windowSize) {
-						iterator.remove();
-					} else {
-						Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
-								name, value.f2, entry.salary);
-						out.collect(outputTuple);
-					}
+		if (salaryHashmap.containsKey(name)) {
+			Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
+			while (iterator.hasNext()) {
+				SalaryProgress entry = iterator.next();
+				if (progress - entry.progress > windowSize) {
+					iterator.remove(); 
+				} else {
+					outputTuple.f2 = entry.salary;
+					out.collect(outputTuple);
 				}
-				if (!gradeHashmap.containsKey(name)) {
-					gradeHashmap.put(name, new LinkedList<GradeProgress>());
-				}
-				gradeHashmap.get(name).add(new GradeProgress(value.f2, progress));
-			} else {
-				if (gradeHashmap.containsKey(name)) {
-					Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
-					while (iterator.hasNext()) {
-						GradeProgress entry = iterator.next();
-						if (progress - entry.progress > windowSize) {
-							iterator.remove();
-						} else {
-							Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
-									name, entry.grade, value.f2);
-							out.collect(outputTuple);
+			}
+		}
+		if (!gradeHashmap.containsKey(name)) {
+			gradeHashmap.put(name, new LinkedList<GradeProgress>());
+		}
+		gradeHashmap.get(name).add(new GradeProgress(value.f1, progress));
+	}
 
-						}
-					}
-				}
-				if (!salaryHashmap.containsKey(name)) {
-					salaryHashmap.put(name, new LinkedList<SalaryProgress>());
+	// Joins the input value (salary) with the already known values (grades) on
+	// a given interval.
+	// Also stores the new element.
+	@Override
+	public void flatMap2(Tuple3<String, Integer, Long> value,
+			Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
+		name = value.f0;
+		progress = value.f2;
+
+		outputTuple.f0 = name;
+		outputTuple.f2 = value.f1;
+		
+		if (gradeHashmap.containsKey(name)) {
+			Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
+			while (iterator.hasNext()) {
+				GradeProgress entry = iterator.next();
+				if (progress - entry.progress > windowSize) {
+					iterator.remove();
+				} else {
+					outputTuple.f1 = entry.grade;
+					out.collect(outputTuple);
 				}
-				salaryHashmap.get(name).add(new SalaryProgress(value.f2, progress));
 			}
-
 		}
+		if (!salaryHashmap.containsKey(name)) {
+			salaryHashmap.put(name, new LinkedList<SalaryProgress>());
+		}
+		salaryHashmap.get(name).add(new SalaryProgress(value.f1, progress));
 	}
 }