You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:50 UTC
[17/28] git commit: [streaming] WindowJoin Example refactored
[streaming] WindowJoin Example refactored
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0b721125
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0b721125
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0b721125
Branch: refs/heads/master
Commit: 0b7211250b120a2d96abd10daead4c0c55eba3c4
Parents: f74cb0a
Author: eszesd <es...@gmail.com>
Authored: Fri Aug 22 17:11:28 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200
----------------------------------------------------------------------
.../examples/window/join/GradeSource.java | 47 +++++++++
.../examples/window/join/SalarySource.java | 47 +++++++++
.../examples/window/join/WindowJoinLocal.java | 24 ++---
.../window/join/WindowJoinSourceOne.java | 48 ---------
.../window/join/WindowJoinSourceTwo.java | 48 ---------
.../examples/window/join/WindowJoinTask.java | 104 +++++++++++--------
6 files changed, 163 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
new file mode 100644
index 0000000..6579728
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.window.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+public class GradeSource implements SourceFunction<Tuple3<String, Integer, Long>> {
+
+ private static final long serialVersionUID = -5897483980082089771L;
+
+ private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
+ "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+ private Random rand = new Random();
+ private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
+ private Long progress = 0L;
+
+ @Override
+ public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+ // Continuously emit tuples with random names and integers (grades).
+ while (true) {
+ outTuple.f0 = names[rand.nextInt(names.length)];
+ outTuple.f1 = rand.nextInt(5) + 1;
+ outTuple.f2 = progress;
+ out.collect(outTuple);
+ progress += 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
new file mode 100644
index 0000000..cceccad
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.window.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+public class SalarySource implements SourceFunction<Tuple3<String, Integer, Long>> {
+
+ private static final long serialVersionUID = 6670933703432267728L;
+
+ private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
+ "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+ private Random rand = new Random();
+ private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
+ private Long progress = 0L;
+
+ @Override
+ public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+ // Continuously emit tuples with random names and integers (salaries).
+ while (true) {
+ outTuple.f0 = names[rand.nextInt(names.length)];
+ outTuple.f1 = rand.nextInt(10000);
+ outTuple.f2 = progress;
+ out.collect(outTuple);
+ progress += 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index e9cef1e..b298f3f 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -18,11 +18,8 @@
package org.apache.flink.streaming.examples.window.join;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
public class WindowJoinLocal {
@@ -32,23 +29,22 @@ public class WindowJoinLocal {
// This example will join two streams with a sliding window. One which emits
// people's grades and one which emits people's salaries.
- //TODO update and reconsider
public static void main(String[] args) {
- LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
+ PARALLELISM).setBufferTimeout(100);
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM).setBufferTimeout(100);
+ DataStream<Tuple3<String, Integer, Long>> grades = env.addSource(new GradeSource(),
+ SOURCE_PARALLELISM);
- DataStream<Tuple4<String, String, Integer, Long>> dataStream1 = env.addSource(
- new WindowJoinSourceOne(), SOURCE_PARALLELISM);
+ DataStream<Tuple3<String, Integer, Long>> salaries = env.addSource(new SalarySource(),
+ SOURCE_PARALLELISM);
- @SuppressWarnings("unchecked")
- DataStream<Tuple3<String, Integer, Integer>> dataStream2 = env
- .addSource(new WindowJoinSourceTwo(), SOURCE_PARALLELISM).merge(dataStream1)
- .partitionBy(1).flatMap(new WindowJoinTask());
+ DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
+ .flatMap(new WindowJoinTask());
- dataStream2.print();
+ System.out.println("(NAME, GRADE, SALARY)");
+ joinedStream.print();
env.execute();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceOne.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceOne.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceOne.java
deleted file mode 100644
index d4651aa..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceOne.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.util.Collector;
-
-public class WindowJoinSourceOne implements SourceFunction<Tuple4<String, String, Integer, Long>> {
-
- private static final long serialVersionUID = 6670933703432267728L;
-
- private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
- private Random rand = new Random();
- private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
- private Long progress = 0L;
-
- @Override
- public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
- // Continuously emit tuples with random names and integers (salaries).
- while (true) {
- outTuple.f0 = "salary";
- outTuple.f1 = names[rand.nextInt(names.length)];
- outTuple.f2 = rand.nextInt(10000);
- outTuple.f3 = progress;
- out.collect(outTuple);
- progress += 1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceTwo.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceTwo.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceTwo.java
deleted file mode 100644
index 0df4fe7..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinSourceTwo.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.util.Collector;
-
-public class WindowJoinSourceTwo implements SourceFunction<Tuple4<String, String, Integer, Long>> {
-
- private static final long serialVersionUID = -5897483980082089771L;
-
- private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
- "andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
- private Random rand = new Random();
- private Tuple4<String, String, Integer, Long> outTuple = new Tuple4<String, String, Integer, Long>();
- private Long progress = 0L;
-
- @Override
- public void invoke(Collector<Tuple4<String, String, Integer, Long>> out) throws Exception {
- // Continuously emit tuples with random names and integers (grades).
- while (true) {
- outTuple.f0 = "grade";
- outTuple.f1 = names[rand.nextInt(names.length)];
- outTuple.f2 = rand.nextInt(5) + 1;
- outTuple.f3 = progress;
- out.collect(outTuple);
- progress += 1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0b721125/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
index 8845257..7758615 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
@@ -21,13 +21,12 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
public class WindowJoinTask extends
- RichFlatMapFunction<Tuple4<String, String, Integer, Long>, Tuple3<String, Integer, Integer>> {
+ RichCoFlatMapFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Integer>> {
class SalaryProgress {
public SalaryProgress(Integer salary, Long progress) {
@@ -53,60 +52,75 @@ public class WindowJoinTask extends
private int windowSize = 100;
private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
-
+ private String name;
+ private Long progress;
+
public WindowJoinTask() {
gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
salaryHashmap = new HashMap<String, LinkedList<SalaryProgress>>();
+ name = new String();
+ progress = 0L;
}
+ Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
+
+ // Joins the input value (grade) with the already known values (salaries) on
+ // a given interval.
+ // Also stores the new element.
@Override
- public void flatMap(Tuple4<String, String, Integer, Long> value,
+ public void flatMap1(Tuple3<String, Integer, Long> value,
Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
- String streamId = value.f0;
- String name = value.f1;
- Long progress = value.f3;
+ name = value.f0;
+ progress = value.f2;
+
+ outputTuple.f0 = name;
+ outputTuple.f1 = value.f1;
- // Joins the input value with the already known values on a given interval. If it is a grade
- // then with the salaries, if it is a salary then with the grades. Also
- // stores the new element.
- if (streamId.equals("grade")) {
- if (salaryHashmap.containsKey(name)) {
- Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
- while (iterator.hasNext()) {
- SalaryProgress entry = iterator.next();
- if (progress - entry.progress > windowSize) {
- iterator.remove();
- } else {
- Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
- name, value.f2, entry.salary);
- out.collect(outputTuple);
- }
+ if (salaryHashmap.containsKey(name)) {
+ Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
+ while (iterator.hasNext()) {
+ SalaryProgress entry = iterator.next();
+ if (progress - entry.progress > windowSize) {
+ iterator.remove();
+ } else {
+ outputTuple.f2 = entry.salary;
+ out.collect(outputTuple);
}
- if (!gradeHashmap.containsKey(name)) {
- gradeHashmap.put(name, new LinkedList<GradeProgress>());
- }
- gradeHashmap.get(name).add(new GradeProgress(value.f2, progress));
- } else {
- if (gradeHashmap.containsKey(name)) {
- Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
- while (iterator.hasNext()) {
- GradeProgress entry = iterator.next();
- if (progress - entry.progress > windowSize) {
- iterator.remove();
- } else {
- Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
- name, entry.grade, value.f2);
- out.collect(outputTuple);
+ }
+ }
+ if (!gradeHashmap.containsKey(name)) {
+ gradeHashmap.put(name, new LinkedList<GradeProgress>());
+ }
+ gradeHashmap.get(name).add(new GradeProgress(value.f1, progress));
+ }
- }
- }
- }
- if (!salaryHashmap.containsKey(name)) {
- salaryHashmap.put(name, new LinkedList<SalaryProgress>());
+ // Joins the input value (salary) with the already known values (grades) on
+ // a given interval.
+ // Also stores the new element.
+ @Override
+ public void flatMap2(Tuple3<String, Integer, Long> value,
+ Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
+ name = value.f0;
+ progress = value.f2;
+
+ outputTuple.f0 = name;
+ outputTuple.f2 = value.f1;
+
+ if (gradeHashmap.containsKey(name)) {
+ Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
+ while (iterator.hasNext()) {
+ GradeProgress entry = iterator.next();
+ if (progress - entry.progress > windowSize) {
+ iterator.remove();
+ } else {
+ outputTuple.f1 = entry.grade;
+ out.collect(outputTuple);
}
- salaryHashmap.get(name).add(new SalaryProgress(value.f2, progress));
}
-
}
+ if (!salaryHashmap.containsKey(name)) {
+ salaryHashmap.put(name, new LinkedList<SalaryProgress>());
+ }
+ salaryHashmap.get(name).add(new SalaryProgress(value.f1, progress));
}
}