You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:53 UTC
[4/8] flink git commit: [FLINK-2819] Add Windowed Join/CoGroup
Operator Based on Tagged Union
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 5230e9b..8abf9d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -19,31 +19,36 @@ package org.apache.flink.streaming.examples.join;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
/**
* Example illustrating join over sliding windows of streams in Flink.
- * <p/>
+ *
* <p>
- * his example will join two streams with a sliding window. One which emits
- * grades and one which emits salaries of people.
- * </p>
- * <p/>
- * <p/>
+ * This example will join two streams with a sliding window. One which emits grades and one which
+ * emits salaries of people. The input format for both sources has an additional timestamp
+ * as field 0. This is used to to event-time windowing. Time timestamps must be
+ * monotonically increasing.
+ *
* This example shows how to:
* <ul>
- * <li>do windowed joins,
- * <li>use tuple data types,
- * <li>write a simple streaming program.
+ * <li>do windowed joins,
+ * <li>use tuple data types,
+ * <li>write a simple streaming program.
+ * </ul>
*/
public class WindowJoin {
@@ -51,9 +56,6 @@ public class WindowJoin {
// PROGRAM
// *************************************************************************
- private static DataStream<Tuple2<String, Integer>> grades;
- private static DataStream<Tuple2<String, Integer>> salaries;
-
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
@@ -62,18 +64,25 @@ public class WindowJoin {
// obtain execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableTimestamps();
// connect to the data sources for grades and salaries
- setInputStreams(env);
+ Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);
+ DataStream<Tuple3<Long, String, Integer>> grades = input.f0;
+ DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
+
+ // extract the timestamps
+ grades = grades.extractTimestamp(new MyTimestampExtractor());
+ salaries = salaries.extractTimestamp(new MyTimestampExtractor());
// apply a temporal join over the two stream based on the names over one
// second windows
DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
.join(salaries)
- .onWindow(1, new MyTimestamp(0), new MyTimestamp(0))
- .where(0)
- .equalTo(0)
- .with(new MyJoinFunction());
+ .where(new NameKeySelector())
+ .equalTo(new NameKeySelector())
+ .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
+ .apply(new MyJoinFunction());
// emit result
if (fileOutput) {
@@ -98,24 +107,25 @@ public class WindowJoin {
/**
* Continuously emit tuples with random names and integers (grades).
*/
- public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+ public static class GradeSource implements SourceFunction<Tuple3<Long, String, Integer>> {
private static final long serialVersionUID = 1L;
private Random rand;
- private Tuple2<String, Integer> outTuple;
+ private Tuple3<Long, String, Integer> outTuple;
private volatile boolean isRunning = true;
private int counter;
public GradeSource() {
rand = new Random();
- outTuple = new Tuple2<String, Integer>();
+ outTuple = new Tuple3<>();
}
@Override
- public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
while (isRunning && counter < 100) {
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+ outTuple.f0 = System.currentTimeMillis();
+ outTuple.f1 = names[rand.nextInt(names.length)];
+ outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1;
Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
counter++;
ctx.collect(outTuple);
@@ -131,27 +141,28 @@ public class WindowJoin {
/**
* Continuously emit tuples with random names and integers (salaries).
*/
- public static class SalarySource extends RichSourceFunction<Tuple2<String, Integer>> {
+ public static class SalarySource extends RichSourceFunction<Tuple3<Long, String, Integer>> {
private static final long serialVersionUID = 1L;
private transient Random rand;
- private transient Tuple2<String, Integer> outTuple;
+ private transient Tuple3<Long, String, Integer> outTuple;
private volatile boolean isRunning;
private int counter;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
rand = new Random();
- outTuple = new Tuple2<String, Integer>();
+ outTuple = new Tuple3<Long, String, Integer>();
isRunning = true;
}
@Override
- public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
while (isRunning && counter < 100) {
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+ outTuple.f0 = System.currentTimeMillis();
+ outTuple.f1 = names[rand.nextInt(names.length)];
+ outTuple.f2 = rand.nextInt(SALARY_MAX) + 1;
Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
counter++;
ctx.collect(outTuple);
@@ -164,7 +175,7 @@ public class WindowJoin {
}
}
- public static class MySourceMap extends RichMapFunction<String, Tuple2<String, Integer>> {
+ public static class MySourceMap extends RichMapFunction<String, Tuple3<Long, String, Integer>> {
private static final long serialVersionUID = 1L;
@@ -175,44 +186,55 @@ public class WindowJoin {
}
@Override
- public Tuple2<String, Integer> map(String line) throws Exception {
+ public Tuple3<Long, String, Integer> map(String line) throws Exception {
record = line.substring(1, line.length() - 1).split(",");
- return new Tuple2<String, Integer>(record[0], Integer.parseInt(record[1]));
+ return new Tuple3<>(Long.parseLong(record[0]), record[1], Integer.parseInt(record[2]));
}
}
public static class MyJoinFunction
implements
- JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
+ JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
private static final long serialVersionUID = 1L;
- private Tuple3<String, Integer, Integer> joined = new Tuple3<String, Integer, Integer>();
+ private Tuple3<String, Integer, Integer> joined = new Tuple3<>();
@Override
- public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first,
- Tuple2<String, Integer> second) throws Exception {
- joined.f0 = first.f0;
- joined.f1 = first.f1;
- joined.f2 = second.f1;
+ public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first,
+ Tuple3<Long, String, Integer> second) throws Exception {
+ joined.f0 = first.f1;
+ joined.f1 = first.f2;
+ joined.f2 = second.f2;
return joined;
}
}
- public static class MyTimestamp implements Timestamp<Tuple2<String, Integer>> {
-
+ private static class MyTimestampExtractor implements TimestampExtractor<Tuple3<Long, String, Integer>> {
private static final long serialVersionUID = 1L;
- private int counter;
+ @Override
+ public long extractTimestamp(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+ return element.f0;
+ }
- public MyTimestamp(int starttime) {
- this.counter = starttime;
+ @Override
+ public long emitWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+ return element.f0 - 1;
}
@Override
- public long getTimestamp(Tuple2<String, Integer> value) {
- counter += SLEEP_TIME;
- return counter;
+ public long getCurrentWatermark() {
+ return Long.MIN_VALUE;
+ }
+ }
+
+ private static class NameKeySelector implements KeySelector<Tuple3<Long, String, Integer>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple3<Long, String, Integer> value) throws Exception {
+ return value.f1;
}
}
@@ -253,7 +275,12 @@ public class WindowJoin {
return true;
}
- private static void setInputStreams(StreamExecutionEnvironment env) {
+ private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> getInputStreams(
+ StreamExecutionEnvironment env) {
+
+ DataStream<Tuple3<Long, String, Integer>> grades;
+ DataStream<Tuple3<Long, String, Integer>> salaries;
+
if (fileInput) {
grades = env.readTextFile(gradesPath).map(new MySourceMap());
salaries = env.readTextFile(salariesPath).map(new MySourceMap());
@@ -261,5 +288,8 @@ public class WindowJoin {
grades = env.addSource(new GradeSource());
salaries = env.addSource(new SalarySource());
}
+
+ return Tuple2.of(grades, salaries);
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
index 23d29b1..15c1280 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
@@ -19,42 +19,42 @@ package org.apache.flink.streaming.examples.join.util;
public class WindowJoinData {
- public static final String GRADES_INPUT = "(john,5)\n" + "(tom,3)\n" + "(alice,1)\n" + "(grace,5)\n" +
- "(john,4)\n" + "(bob,1)\n" + "(alice,2)\n" + "(alice,3)\n" + "(bob,5)\n" + "(alice,3)\n" + "(tom,5)\n" +
- "(john,2)\n" + "(john,1)\n" + "(grace,2)\n" + "(jerry,2)\n" + "(tom,4)\n" + "(bob,4)\n" + "(bob,2)\n" +
- "(tom,2)\n" + "(alice,5)\n" + "(grace,5)\n" + "(grace,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(tom,1)\n" +
- "(jerry,5)\n" + "(john,3)\n" + "(john,4)\n" + "(john,1)\n" + "(jerry,3)\n" + "(grace,3)\n" + "(bob,3)\n" +
- "(john,3)\n" + "(jerry,4)\n" + "(tom,5)\n" + "(tom,4)\n" + "(john,2)\n" + "(jerry,1)\n" + "(bob,1)\n" +
- "(john,5)\n" + "(grace,4)\n" + "(tom,5)\n" + "(john,4)\n" + "(tom,1)\n" + "(grace,1)\n" + "(john,2)\n" +
- "(jerry,3)\n" + "(jerry,5)\n" + "(tom,2)\n" + "(tom,2)\n" + "(alice,4)\n" + "(tom,4)\n" + "(jerry,4)\n" +
- "(john,3)\n" + "(grace,4)\n" + "(tom,3)\n" + "(jerry,4)\n" + "(john,5)\n" + "(john,4)\n" + "(jerry,1)\n" +
- "(john,5)\n" + "(alice,2)\n" + "(tom,1)\n" + "(alice,5)\n" + "(grace,4)\n" + "(bob,4)\n" + "(jerry,1)\n" +
- "(john,5)\n" + "(tom,4)\n" + "(tom,5)\n" + "(jerry,5)\n" + "(tom,1)\n" + "(grace,3)\n" + "(bob,5)\n" +
- "(john,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(grace,1)\n" + "(jerry,1)\n" + "(jerry,4)\n" +
- "(bob,4)\n" + "(alice,3)\n" + "(tom,5)\n" + "(alice,4)\n" + "(alice,4)\n" + "(grace,4)\n" + "(john,5)\n" +
- "(john,5)\n" + "(grace,4)\n" + "(tom,4)\n" + "(john,4)\n" + "(john,5)\n" + "(alice,5)\n" + "(jerry,5)\n" +
- "(john,3)\n" + "(tom,5)\n" + "(jerry,4)\n" + "(grace,4)\n" + "(john,3)\n" + "(bob,2)";
+ public static final String GRADES_INPUT = "(0,john,5)\n" + "(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" +
+ "(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + "(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" +
+ "(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + "(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" +
+ "(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + "(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" +
+ "(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + "(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" +
+ "(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + "(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" +
+ "(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + "(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" +
+ "(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + "(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" +
+ "(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + "(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" +
+ "(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + "(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" +
+ "(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + "(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" +
+ "(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + "(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" +
+ "(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + "(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" +
+ "(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + "(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" +
+ "(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + "(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)";
- public static final String SALARIES_INPUT = "(john,6469)\n" + "(jerry,6760)\n" + "(jerry,8069)\n" +
- "(tom,3662)\n" + "(grace,8427)\n" + "(john,9425)\n" + "(bob,9018)\n" + "(john,352)\n" + "(tom,3770)\n" +
- "(grace,7622)\n" + "(jerry,7441)\n" + "(alice,1468)\n" + "(bob,5472)\n" + "(grace,898)\n" +
- "(tom,3849)\n" + "(grace,1865)\n" + "(alice,5582)\n" + "(john,9511)\n" + "(alice,1541)\n" +
- "(john,2477)\n" + "(grace,3561)\n" + "(john,1670)\n" + "(grace,7290)\n" + "(grace,6565)\n" +
- "(tom,6179)\n" + "(tom,1601)\n" + "(john,2940)\n" + "(bob,4685)\n" + "(bob,710)\n" + "(bob,5936)\n" +
- "(jerry,1412)\n" + "(grace,6515)\n" + "(grace,3321)\n" + "(tom,8088)\n" + "(john,2876)\n" +
- "(bob,9896)\n" + "(grace,7368)\n" + "(grace,9749)\n" + "(bob,2048)\n" + "(alice,4782)\n" +
- "(alice,3375)\n" + "(tom,5841)\n" + "(bob,958)\n" + "(bob,5258)\n" + "(tom,3935)\n" + "(jerry,4394)\n" +
- "(alice,102)\n" + "(alice,4931)\n" + "(alice,5240)\n" + "(jerry,7951)\n" + "(john,5675)\n" +
- "(bob,609)\n" + "(alice,5997)\n" + "(jerry,9651)\n" + "(alice,1328)\n" + "(bob,1022)\n" +
- "(grace,2578)\n" + "(jerry,9704)\n" + "(tom,4476)\n" + "(grace,3784)\n" + "(alice,6144)\n" +
- "(bob,6213)\n" + "(alice,7525)\n" + "(jerry,2908)\n" + "(grace,8464)\n" + "(jerry,9920)\n" +
- "(bob,3720)\n" + "(bob,7612)\n" + "(alice,7211)\n" + "(jerry,6484)\n" + "(alice,1711)\n" +
- "(jerry,5994)\n" + "(grace,928)\n" + "(jerry,2492)\n" + "(grace,9080)\n" + "(tom,4330)\n" +
- "(bob,8302)\n" + "(john,4981)\n" + "(tom,1781)\n" + "(grace,1379)\n" + "(jerry,3700)\n" +
- "(jerry,3584)\n" + "(jerry,2038)\n" + "(jerry,3902)\n" + "(tom,1336)\n" + "(jerry,7500)\n" +
- "(tom,3648)\n" + "(alice,2533)\n" + "(tom,8685)\n" + "(bob,3968)\n" + "(tom,3241)\n" + "(bob,7461)\n" +
- "(jerry,2138)\n" + "(alice,7503)\n" + "(alice,6424)\n" + "(tom,140)\n" + "(john,9802)\n" +
- "(grace,2977)\n" + "(grace,889)\n" + "(john,1338)";
+ public static final String SALARIES_INPUT = "(0,john,6469)\n" + "(0,jerry,6760)\n" + "(0,jerry,8069)\n" +
+ "(1,tom,3662)\n" + "(1,grace,8427)\n" + "(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" +
+ "(2,grace,7622)\n" + "(2,jerry,7441)\n" + "(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" +
+ "(3,tom,3849)\n" + "(3,grace,1865)\n" + "(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" +
+ "(4,john,2477)\n" + "(4,grace,3561)\n" + "(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" +
+ "(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" + "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" +
+ "(6,jerry,1412)\n" + "(6,grace,6515)\n" + "(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" +
+ "(7,bob,9896)\n" + "(7,grace,7368)\n" + "(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" +
+ "(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" + "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" +
+ "(9,alice,102)\n" + "(9,alice,4931)\n" + "(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" +
+ "(10,bob,609)\n" + "(10,alice,5997)\n" + "(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" +
+ "(11,grace,2578)\n" + "(11,jerry,9704)\n" + "(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" +
+ "(12,bob,6213)\n" + "(12,alice,7525)\n" + "(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" +
+ "(13,bob,3720)\n" + "(13,bob,7612)\n" + "(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" +
+ "(14,jerry,5994)\n" + "(14,grace,928)\n" + "(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" +
+ "(15,bob,8302)\n" + "(15,john,4981)\n" + "(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" +
+ "(16,jerry,3584)\n" + "(16,jerry,2038)\n" + "(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" +
+ "(17,tom,3648)\n" + "(17,alice,2533)\n" + "(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" +
+ "(18,jerry,2138)\n" + "(18,alice,7503)\n" + "(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" +
+ "(19,grace,2977)\n" + "(19,grace,889)\n" + "(19,john,1338)";
private WindowJoinData() {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 239f1fa..225dab7 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.scala.examples.join
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
import scala.Stream._
import scala.language.postfixOps
@@ -32,8 +34,8 @@ object WindowJoin {
// PROGRAM
// *************************************************************************
- case class Grade(name: String, grade: Int)
- case class Salary(name: String, salary: Int)
+ case class Grade(time: Long, name: String, grade: Int)
+ case class Salary(time: Long, name: String, salary: Int)
case class Person(name: String, grade: Int, salary: Int)
def main(args: Array[String]) {
@@ -43,6 +45,7 @@ object WindowJoin {
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableTimestamps()
//Create streams for grades and salaries by mapping the inputs to the corresponding objects
val grades = setGradesInput(env)
@@ -50,11 +53,11 @@ object WindowJoin {
//Join the two input streams by name on the last 2 seconds every second and create new
//Person objects containing both grade and salary
- val joined =
- grades.join(salaries).onWindow(2, TimeUnit.SECONDS)
- .every(1, TimeUnit.SECONDS)
- .where("name")
- .equalTo("name") { (g, s) => Person(g.name, g.grade, s.salary) }
+ val joined = grades.join(salaries)
+ .where(_.name)
+ .equalTo(_.name)
+ .window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
+ .apply { (g, s) => Person(g.name, g.grade, s.salary) }
if (fileOutput) {
joined.writeAsText(outputPath)
@@ -74,27 +77,27 @@ object WindowJoin {
val salaryMax = 10000
val sleepInterval = 100
- def gradeStream(): Stream[(String, Int)] = {
- def gradeMapper(names: Array[String])(x: Int): (String, Int) =
+ def gradeStream: Stream[(Long, String, Int)] = {
+ def gradeMapper(names: Array[String])(x: Int): (Long, String, Int) =
{
if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
- (names(Random.nextInt(names.length)), Random.nextInt(gradeCount))
+ (System.currentTimeMillis(),names(Random.nextInt(names.length)),Random.nextInt(gradeCount))
}
range(1, 100).map(gradeMapper(names))
}
- def salaryStream(): Stream[(String, Int)] = {
- def salaryMapper(x: Int): (String, Int) =
+ def salaryStream: Stream[(Long, String, Int)] = {
+ def salaryMapper(x: Int): (Long, String, Int) =
{
if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
- (names(Random.nextInt(names.length)), Random.nextInt(salaryMax))
+ (System.currentTimeMillis(), names(Random.nextInt(names.length)), Random.nextInt(salaryMax))
}
range(1, 100).map(salaryMapper)
}
- def parseMap(line : String): (String, Int) = {
+ def parseMap(line : String): (Long, String, Int) = {
val record = line.substring(1, line.length - 1).split(",")
- (record(0), record(1).toInt)
+ (record(0).toLong, record(1), record(2).toInt)
}
// *************************************************************************
@@ -130,23 +133,23 @@ object WindowJoin {
System.out.println(" Provide parameter to write to file.")
System.out.println(" Usage: WindowJoin <result path>")
}
- return true
+ true
}
private def setGradesInput(env: StreamExecutionEnvironment) : DataStream[Grade] = {
if (fileInput) {
- env.readTextFile(gradesPath).map(parseMap(_)).map(x => Grade(x._1, x._2))
+ env.readTextFile(gradesPath).map(parseMap _ ).map(x => Grade(x._1, x._2, x._3))
} else {
- env.fromCollection(gradeStream).map(x => Grade(x._1, x._2))
+ env.fromCollection(gradeStream).map(x => Grade(x._1, x._2, x._3))
}
}
private def setSalariesInput(env: StreamExecutionEnvironment) : DataStream[Salary] = {
if (fileInput) {
- env.readTextFile(salariesPath).map(parseMap(_)).map(x => Salary(x._1, x._2))
+ env.readTextFile(salariesPath).map(parseMap _).map(x => Salary(x._1, x._2, x._3))
}
else {
- env.fromCollection(salaryStream).map(x => Salary(x._1, x._2))
+ env.fromCollection(salaryStream).map(x => Salary(x._1, x._2, x._3))
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
index aae4b93..e657b67 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
@@ -1,51 +1,50 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-// TODO: reactivate once we have new join implementation
-//package org.apache.flink.streaming.test.exampleJavaPrograms.join;
-//
-//import org.apache.flink.streaming.examples.join.WindowJoin;
-//import org.apache.flink.streaming.examples.join.util.WindowJoinData;
-//import org.apache.flink.streaming.util.StreamingProgramTestBase;
-//
-//public class WindowJoinITCase extends StreamingProgramTestBase {
-//
-// protected String gradesPath;
-// protected String salariesPath;
-// protected String resultPath;
-//
-// @Override
-// protected void preSubmit() throws Exception {
-// gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
-// salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
-// resultPath = getTempDirPath("result");
-// }
-//
-// @Override
-// protected void postSubmit() throws Exception {
-// // since the two sides of the join might have different speed
-// // the exact output can not be checked just whether it is well-formed
-// // checks that the result lines look like e.g. (bob, 2, 2015)
-// checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
-// }
-//
-// @Override
-// protected void testProgram() throws Exception {
-// WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
-// }
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.join;
+
+import org.apache.flink.streaming.examples.join.WindowJoin;
+import org.apache.flink.streaming.examples.join.util.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class WindowJoinITCase extends StreamingProgramTestBase {
+
+ protected String gradesPath;
+ protected String salariesPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
+ salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ // since the two sides of the join might have different speed
+ // the exact output can not be checked just whether it is well-formed
+ // checks that the result lines look like e.g. (bob, 2, 2015)
+ checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
index 0aa884f..08ce890 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
@@ -1,51 +1,50 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-// TODO: reactivate once we have new join implementation
-//package org.apache.flink.streaming.test.exampleScalaPrograms.join;
-//
-//import org.apache.flink.streaming.scala.examples.join.WindowJoin;
-//import org.apache.flink.streaming.examples.join.util.WindowJoinData;
-//import org.apache.flink.streaming.util.StreamingProgramTestBase;
-//
-//public class WindowJoinITCase extends StreamingProgramTestBase {
-//
-// protected String gradesPath;
-// protected String salariesPath;
-// protected String resultPath;
-//
-// @Override
-// protected void preSubmit() throws Exception {
-// gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
-// salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
-// resultPath = getTempDirPath("result");
-// }
-//
-// @Override
-// protected void postSubmit() throws Exception {
-// // since the two sides of the join might have different speed
-// // the exact output can not be checked just whether it is well-formed
-// // checks that the result lines look like e.g. Person(bob, 2, 2015)
-// checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
-// }
-//
-// @Override
-// protected void testProgram() throws Exception {
-// WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
-// }
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.join;
+
+import org.apache.flink.streaming.scala.examples.join.WindowJoin;
+import org.apache.flink.streaming.examples.join.util.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class WindowJoinITCase extends StreamingProgramTestBase {
+
+ protected String gradesPath;
+ protected String salariesPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
+ salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ // since the two sides of the join might have different speed
+ // the exact output can not be checked just whether it is well-formed
+ // checks that the result lines look like e.g. Person(bob, 2, 2015)
+ checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
new file mode 100644
index 0000000..1b16e44
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams}
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+import scala.reflect.ClassTag
+
+/**
+ * `CoGroupedStreams` represents two [[DataStream]]s that have been co-grouped.
+ * A streaming co-group operation is evaluated over elements in a window.
+ *
+ * To finalize the co-group operation you also need to specify a [[KeySelector]] for
+ * both the first and second input and a [[WindowAssigner]]
+ *
+ * Note: Right now, the groups are being built in memory so you need to ensure that they don't
+ * get too big. Otherwise the JVM might crash.
+ *
+ * Example:
+ *
+ * {{{
+ * val one: DataStream[(String, Int)] = ...
+ * val two: DataStream[(String, Int)] = ...
+ *
+ * val result = one.coGroup(two)
+ * .where(new MyFirstKeySelector())
+ * .equalTo(new MyFirstKeySelector())
+ * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ * .apply(new MyCoGroupFunction())
+ * } }}}
+ */
+object CoGroupedStreams {
+
+ /**
+ * A co-group operation that does not yet have its [[KeySelector]]s defined.
+ *
+ * @tparam T1 Type of the elements from the first input
+ * @tparam T2 Type of the elements from the second input
+ */
+ class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
+
+ /**
+ * Specifies a [[KeySelector]] for elements from the first input.
+ */
+ def where[KEY](keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+ val cleanFun = clean(keySelector)
+ val javaSelector = new KeySelector[T1, KEY] {
+ def getKey(in: T1) = cleanFun(in)
+ }
+ new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null)
+ }
+
+ /**
+ * Specifies a [[KeySelector]] for elements from the second input.
+ */
+ def equalTo[KEY](keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+ val cleanFun = clean(keySelector)
+ val javaSelector = new KeySelector[T2, KEY] {
+ def getKey(in: T2) = cleanFun(in)
+ }
+ new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector)
+ }
+
+ /**
+ * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+ * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+ */
+ private[flink] def clean[F <: AnyRef](f: F): F = {
+ new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+ }
+ }
+
+ /**
+ * A co-group operation that has [[KeySelector]]s defined for either both or
+ * one input.
+ *
+ * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]]
+ * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]].
+ *
+ * @tparam T1 Type of the elements from the first input
+ * @tparam T2 Type of the elements from the second input
+ * @tparam KEY Type of the key. This must be the same for both inputs
+ */
+ class WithKey[T1, T2, KEY](
+ input1: DataStream[T1],
+ input2: DataStream[T2],
+ keySelector1: KeySelector[T1, KEY],
+ keySelector2: KeySelector[T2, KEY]) {
+
+ /**
+ * Specifies a [[KeySelector]] for elements from the first input.
+ */
+ def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+ val cleanFun = clean(keySelector)
+ val javaSelector = new KeySelector[T1, KEY] {
+ def getKey(in: T1) = cleanFun(in)
+ }
+ new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2)
+ }
+
+ /**
+ * Specifies a [[KeySelector]] for elements from the second input.
+ */
+ def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+ val cleanFun = clean(keySelector)
+ val javaSelector = new KeySelector[T2, KEY] {
+ def getKey(in: T2) = cleanFun(in)
+ }
+ new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector)
+ }
+
+ /**
+ * Specifies the window on which the co-group operation works.
+ */
+ def window[W <: Window](
+ assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
+ : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
+ if (keySelector1 == null || keySelector2 == null) {
+ throw new UnsupportedOperationException("You first need to specify KeySelectors for both" +
+ "inputs using where() and equalTo().")
+ }
+ new CoGroupedStreams.WithWindow[T1, T2, KEY, W](
+ input1,
+ input2,
+ keySelector1,
+ keySelector2,
+ clean(assigner),
+ null,
+ null)
+ }
+
+ /**
+ * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+ * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+ */
+ private[flink] def clean[F <: AnyRef](f: F): F = {
+ new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+ }
+ }
+
+ /**
+ * A co-group operation that has [[KeySelector]]s defined for both inputs as
+ * well as a [[WindowAssigner]].
+ *
+ * @tparam T1 Type of the elements from the first input
+ * @tparam T2 Type of the elements from the second input
+ * @tparam KEY Type of the key. This must be the same for both inputs
+ * @tparam W Type of { @link Window} on which the co-group operation works.
+ */
+ class WithWindow[T1, T2, KEY, W <: Window](
+ input1: DataStream[T1],
+ input2: DataStream[T2],
+ keySelector1: KeySelector[T1, KEY],
+ keySelector2: KeySelector[T2, KEY],
+ windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
+ trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+ evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
+
+
+ /**
+ * Sets the [[Trigger]] that should be used to trigger window emission.
+ */
+ def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
+ : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
+ new WithWindow[T1, T2, KEY, W](
+ input1,
+ input2,
+ keySelector1,
+ keySelector2,
+ windowAssigner,
+ newTrigger,
+ evictor)
+ }
+
+ /**
+ * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
+ *
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
+ : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
+ new WithWindow[T1, T2, KEY, W](
+ input1,
+ input2,
+ keySelector1,
+ keySelector2,
+ windowAssigner,
+ trigger,
+ newEvictor)
+ }
+
+ /**
+ * Completes the co-group operation with the user function that is executed
+ * for windowed groups.
+ */
+ def apply[O: TypeInformation: ClassTag](
+ fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = {
+ require(fun != null, "CoGroup function must not be null.")
+
+ val coGrouper = new CoGroupFunction[T1, T2, O] {
+ val cleanFun = clean(fun)
+ def coGroup(
+ left: java.lang.Iterable[T1],
+ right: java.lang.Iterable[T2], out: Collector[O]) = {
+ out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala))
+ }
+ }
+ apply(coGrouper)
+ }
+
+ /**
+ * Completes the co-group operation with the user function that is executed
+ * for windowed groups.
+ */
+ def apply[O: TypeInformation: ClassTag](
+ fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] = {
+ require(fun != null, "CoGroup function must not be null.")
+
+ val coGrouper = new CoGroupFunction[T1, T2, O] {
+ val cleanFun = clean(fun)
+ def coGroup(
+ left: java.lang.Iterable[T1],
+ right: java.lang.Iterable[T2], out: Collector[O]) = {
+ cleanFun(left.iterator.asScala, right.iterator.asScala, out)
+ }
+ }
+ apply(coGrouper)
+ }
+
+ /**
+ * Completes the co-group operation with the user function that is executed
+ * for windowed groups.
+ */
+ def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = {
+
+ val coGroup = JavaCoGroupedStreams.createCoGroup(input1.getJavaStream, input2.getJavaStream)
+
+ coGroup
+ .where(keySelector1)
+ .equalTo(keySelector2)
+ .window(windowAssigner)
+ .trigger(trigger)
+ .evictor(evictor)
+ .apply(clean(function), implicitly[TypeInformation[T]])
+ }
+
+ /**
+ * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+ * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+ */
+ private[flink] def clean[F <: AnyRef](f: F): F = {
+ new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+ }
+ }
+
+
+ /**
+ * Creates a new co-group operation from the two given inputs.
+ */
+ def createCoGroup[T1, T2](input1: DataStream[T1], input2: DataStream[T2])
+ : CoGroupedStreams.Unspecified[T1, T2] = {
+ new CoGroupedStreams.Unspecified[T1, T2](input1, input2)
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 8aeacb4..7babc40 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -751,18 +751,20 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
/**
- * Initiates a temporal Join transformation that joins the elements of two
- * data streams on key equality over a specified time window.
- *
- * This method returns a StreamJoinOperator on which the
- * .onWindow(..) should be called to define the
- * window, and then the .where(..) and .equalTo(..) methods can be used to defin
- * the join keys.</p> The user can also use the apply method of the returned JoinedStream
- * to use custom join function.
- *
+ * Creates a co-group operation. See [[CoGroupedStreams]] for an example of how the keys
+ * and window can be specified.
*/
- def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] =
- new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
+ def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams.Unspecified[T, T2] = {
+ CoGroupedStreams.createCoGroup(this, otherStream)
+ }
+
+ /**
+ * Creates a join operation. See [[JoinedStreams]] for an example of how the keys
+ * and window can be specified.
+ */
+ def join[T2](otherStream: DataStream[T2]): JoinedStreams.Unspecified[T, T2] = {
+ JoinedStreams.createJoin(this, otherStream)
+ }
/**
* Writes a DataStream to the standard output stream (stdout). For each
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
new file mode 100644
index 0000000..be059b8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.datastream.{JoinedStreams => JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams}
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.reflect.ClassTag
+
+/**
+ * `JoinedStreams` represents two [[DataStream]]s that have been joined.
+ * A streaming join operation is evaluated over elements in a window.
+ *
+ * To finalize the join operation you also need to specify a [[KeySelector]] for
+ * both the first and second input and a [[WindowAssigner]]
+ *
+ * Note: Right now, the groups are being built in memory so you need to ensure that they don't
+ * get too big. Otherwise the JVM might crash.
+ *
+ * Example:
+ *
+ * {{{
+ * val one: DataStream[(String, Int)] = ...
+ * val two: DataStream[(String, Int)] = ...
+ *
+ * val result = one.join(two)
+ * .where {t => ... }
+ * .equal {t => ... }
+ * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ * .apply(new MyJoinFunction())
+ * } }}}
+ */
+object JoinedStreams {
+
+ /**
+ * A join operation that does not yet have its [[KeySelector]]s defined.
+ *
+ * @tparam T1 Type of the elements from the first input
+ * @tparam T2 Type of the elements from the second input
+ */
+ class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
+
+ /**
+ * Specifies a [[KeySelector]] for elements from the first input.
+ */
+ def where[KEY](keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+ val cleanFun = clean(keySelector)
+ val javaSelector = new KeySelector[T1, KEY] {
+ def getKey(in: T1) = cleanFun(in)
+ }
+ new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null)
+ }
+
+ /**
+ * Specifies a [[KeySelector]] for elements from the second input.
+ */
+ def equalTo[KEY](keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+ val cleanFun = clean(keySelector)
+ val javaSelector = new KeySelector[T2, KEY] {
+ def getKey(in: T2) = cleanFun(in)
+ }
+ new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector)
+ }
+
+ /**
+ * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+ * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+ */
+ private[flink] def clean[F <: AnyRef](f: F): F = {
+ new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+ }
+ }
+
+ /**
+ * A join operation that has [[KeySelector]]s defined for either both or
+ * one input.
+ *
+ * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]]
+ * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]].
+ *
+ * @tparam T1 Type of the elements from the first input
+ * @tparam T2 Type of the elements from the second input
+ * @tparam KEY Type of the key. This must be the same for both inputs
+ */
+ class WithKey[T1, T2, KEY](
+ input1: DataStream[T1],
+ input2: DataStream[T2],
+ keySelector1: KeySelector[T1, KEY],
+ keySelector2: KeySelector[T2, KEY]) {
+
+ /**
+ * Specifies a [[KeySelector]] for elements from the first input.
+ */
+ def where(keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+ val cleanFun = clean(keySelector)
+ val javaSelector = new KeySelector[T1, KEY] {
+ def getKey(in: T1) = cleanFun(in)
+ }
+ new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2)
+ }
+
+ /**
+ * Specifies a [[KeySelector]] for elements from the second input.
+ */
+ def equalTo(keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+ val cleanFun = clean(keySelector)
+ val javaSelector = new KeySelector[T2, KEY] {
+ def getKey(in: T2) = cleanFun(in)
+ }
+ new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector)
+ }
+
+ /**
+ * Specifies the window on which the join operation works.
+ */
+ def window[W <: Window](
+ assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
+ : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
+ if (keySelector1 == null || keySelector2 == null) {
+ throw new UnsupportedOperationException("You first need to specify KeySelectors for both" +
+ "inputs using where() and equalTo().")
+ }
+ new JoinedStreams.WithWindow[T1, T2, KEY, W](
+ input1,
+ input2,
+ keySelector1,
+ keySelector2,
+ clean(assigner),
+ null,
+ null)
+ }
+
+ /**
+ * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+ * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+ */
+ private[flink] def clean[F <: AnyRef](f: F): F = {
+ new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+ }
+ }
+
+ /**
+ * A join operation that has [[KeySelector]]s defined for both inputs as
+ * well as a [[WindowAssigner]].
+ *
+ * @tparam T1 Type of the elements from the first input
+ * @tparam T2 Type of the elements from the second input
+ * @tparam KEY Type of the key. This must be the same for both inputs
+ * @tparam W Type of { @link Window} on which the join operation works.
+ */
+ class WithWindow[T1, T2, KEY, W <: Window](
+ input1: DataStream[T1],
+ input2: DataStream[T2],
+ keySelector1: KeySelector[T1, KEY],
+ keySelector2: KeySelector[T2, KEY],
+ windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
+ trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+ evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
+
+
+ /**
+ * Sets the [[Trigger]] that should be used to trigger window emission.
+ */
+ def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
+ : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
+ new WithWindow[T1, T2, KEY, W](
+ input1,
+ input2,
+ keySelector1,
+ keySelector2,
+ windowAssigner,
+ newTrigger,
+ evictor)
+ }
+
+ /**
+ * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
+ *
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
+ : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
+ new WithWindow[T1, T2, KEY, W](
+ input1,
+ input2,
+ keySelector1,
+ keySelector2,
+ windowAssigner,
+ trigger,
+ newEvictor)
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for windowed groups.
+ */
+ def apply[O: TypeInformation: ClassTag](fun: (T1, T2) => O): DataStream[O] = {
+ require(fun != null, "Join function must not be null.")
+
+ val joiner = new FlatJoinFunction[T1, T2, O] {
+ val cleanFun = clean(fun)
+ def join(left: T1, right: T2, out: Collector[O]) = {
+ out.collect(cleanFun(left, right))
+ }
+ }
+ apply(joiner)
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for windowed groups.
+ */
+
+ def apply[O: TypeInformation: ClassTag](fun: (T1, T2, Collector[O]) => Unit): DataStream[O] = {
+ require(fun != null, "Join function must not be null.")
+
+ val joiner = new FlatJoinFunction[T1, T2, O] {
+ val cleanFun = clean(fun)
+ def join(left: T1, right: T2, out: Collector[O]) = {
+ cleanFun(left, right, out)
+ }
+ }
+ apply(joiner)
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for windowed groups.
+ */
+ def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = {
+
+ val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream)
+
+ join
+ .where(keySelector1)
+ .equalTo(keySelector2)
+ .window(windowAssigner)
+ .trigger(trigger)
+ .evictor(evictor)
+ .apply(clean(function), implicitly[TypeInformation[T]])
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for windowed groups.
+ */
+ def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] = {
+
+ val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream)
+
+ join
+ .where(keySelector1)
+ .equalTo(keySelector2)
+ .window(windowAssigner)
+ .trigger(trigger)
+ .evictor(evictor)
+ .apply(clean(function), implicitly[TypeInformation[T]])
+ }
+
+ /**
+ * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+ * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+ */
+ private[flink] def clean[F <: AnyRef](f: F): F = {
+ new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+ }
+ }
+
+
+ /**
+ * Creates a new join operation from the two given inputs.
+ */
+ def createJoin[T1, T2](input1: DataStream[T1], input2: DataStream[T2])
+ : JoinedStreams.Unspecified[T1, T2] = {
+ new JoinedStreams.Unspecified[T1, T2](input1, input2)
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
deleted file mode 100644
index e0bbaf8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.functions.JoinFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.util.keys.KeySelectorUtil
-
-import scala.Array.canBuildFrom
-import scala.reflect.ClassTag
-
-class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
-TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
-
- override def createNextWindowOperator() = {
- new StreamJoinOperator.JoinWindow[I1, I2](this)
- }
-}
-
-object StreamJoinOperator {
-
- class JoinWindow[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2]) extends
- TemporalWindow[JoinWindow[I1, I2]] {
-
- private[flink] val type1 = op.input1.getType()
-
- /**
- * Continues a temporal Join transformation by defining
- * the fields in the first stream to be used as keys for the join.
- * The resulting incomplete join can be completed by JoinPredicate.equalTo()
- * to define the second key.
- */
- def where(fields: Int*) = {
- new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys(fields.toArray, type1),
- type1,
- op.input1.getExecutionEnvironment.getConfig))
- }
-
- /**
- * Continues a temporal Join transformation by defining
- * the fields in the first stream to be used as keys for the join.
- * The resulting incomplete join can be completed by JoinPredicate.equalTo()
- * to define the second key.
- */
- def where(firstField: String, otherFields: String*) =
- new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1),
- type1,
- op.input1.getExecutionEnvironment.getConfig))
-
- /**
- * Continues a temporal Join transformation by defining
- * the keyselector function that will be used to extract keys from the first stream
- * for the join.
- * The resulting incomplete join can be completed by JoinPredicate.equalTo()
- * to define the second key.
- */
- def where[K: TypeInformation](fun: (I1) => K) = {
- val keyType = implicitly[TypeInformation[K]]
- val cleanFun = op.input1.clean(fun)
- val keyExtractor = new KeySelector[I1, K] {
- def getKey(in: I1) = cleanFun(in)
- }
- new JoinPredicate[I1, I2](op, keyExtractor)
- }
-
- override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = {
- every(timeUnit.toMillis(length))
- }
-
- override def every(length: Long): JoinWindow[I1, I2] = {
- op.slideInterval = length
- this
- }
-
- }
-
- class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
- private[flink] val keys1: KeySelector[I1, _]) {
- private[flink] var keys2: KeySelector[I2, _] = null
- private[flink] val type2 = op.input2.getType()
-
- /**
- * Creates a temporal join transformation by defining the second join key.
- * The returned transformation wrapes each joined element pair in a tuple2:
- * (first, second)
- * To define a custom wrapping, use JoinedStream.apply(...)
- */
- def equalTo(fields: Int*): JoinedStream[I1, I2] = {
- finish(KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys(fields.toArray, type2),
- type2,
- op.input1.getExecutionEnvironment.getConfig))
- }
-
- /**
- * Creates a temporal join transformation by defining the second join key.
- * The returned transformation wrapes each joined element pair in a tuple2:
- * (first, second)
- * To define a custom wrapping, use JoinedStream.apply(...)
- */
- def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] =
- finish(KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2),
- type2,
- op.input1.getExecutionEnvironment.getConfig))
-
- /**
- * Creates a temporal join transformation by defining the second join key.
- * The returned transformation wrapes each joined element pair in a tuple2:
- * (first, second)
- * To define a custom wrapping, use JoinedStream.apply(...)
- */
- def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
- val keyType = implicitly[TypeInformation[K]]
- val cleanFun = op.input1.clean(fun)
- val keyExtractor = new KeySelector[I2, K] {
- def getKey(in: I2) = cleanFun(in)
- }
- finish(keyExtractor)
- }
-
- private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
- this.keys2 = keys2
- new JoinedStream[I1, I2](this, createJoinOperator())
- }
-
- private def createJoinOperator(): JavaStream[(I1, I2)] = {
-
-// val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
-// op.input1.keyBy(keys1).connect(op.input2.keyBy(keys2))
-// .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
-// returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
- null
- }
- }
-
- class JoinedStream[I1, I2](
- jp: JoinPredicate[I1, I2],
- javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) {
-
- private val op = jp.op
-
- /**
- * Sets a wrapper for the joined elements. For each joined pair, the result of the
- * udf call will be emitted.
- */
- def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
-
- val cleanFun = clean(getJoinWindowFunction(jp, fun))
-
-// op.input1.keyBy(jp.keys1).connect(op.input2.keyBy(jp.keys2))
-// .addGeneralWindowCombine[R](
-// cleanFun,
-// implicitly[TypeInformation[R]],
-// op.windowSize,
-// op.slideInterval,
-// op.timeStamp1,
-// op.timeStamp2)
- null
- }
- }
-
- private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
- joinFunction: (I1, I2) => R) = {
- require(joinFunction != null, "Join function must not be null.")
-
- val cleanFun = jp.op.input1.clean(joinFunction)
-
- val joinFun = new JoinFunction[I1, I2, R] {
- override def join(first: I1, second: I2): R = {
- cleanFun(first, second)
- }
- }
-
-// new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
- null
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
deleted file mode 100644
index 8357c4d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.datastream.temporal.{ TemporalOperator => JTempOp }
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.windowing.helper.Timestamp
-
-abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]](
- i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) {
-
- def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long = 0): OP = {
- val timeStamp1 = getTS(ts1)
- val timeStamp2 = getTS(ts2)
- onWindow(length, timeStamp1, timeStamp2, startTime)
- }
-
- def getTS[R](ts: R => Long): Timestamp[R] = {
- val cleanFun = clean(ts)
- new Timestamp[R] {
- def getTimestamp(in: R) = cleanFun(in)
- }
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(i1.getExecutionEnvironment).scalaClean(f)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
new file mode 100644
index 0000000..7232309
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.functions.TimestampExtractor
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.mutable
+
+class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ def testCoGroup(): Unit = {
+ CoGroupJoinITCase.testResults = mutable.MutableList()
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ env.getConfig.enableTimestamps
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 6))
+ ctx.collect(("a", 7))
+ ctx.collect(("a", 8))
+ }
+
+ def cancel() {
+ }
+ }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+
+ val source2 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("b", 3))
+ ctx.collect(("c", 6))
+ ctx.collect(("c", 7))
+ ctx.collect(("c", 8))
+ }
+
+ def cancel() {
+ }
+ }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+
+ source1.coGroup(source2)
+ .where(_._1)
+ .equalTo(_._1)
+ .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply { (first: Iterator[(String, Int)], second: Iterator[(String, Int)]) =>
+ "F:" + first.mkString("") + " S:" + second.mkString("")
+ }
+ .addSink(new SinkFunction[String]() {
+ def invoke(value: String) {
+ CoGroupJoinITCase.testResults += value
+ }
+ })
+
+ env.execute("CoGroup Test")
+
+ val expectedResult = mutable.MutableList(
+ "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
+ "F:(b,3)(b,4)(b,5) S:(b,3)",
+ "F:(a,6)(a,7)(a,8) S:",
+ "F: S:(c,6)(c,7)(c,8)")
+
+ assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
+ }
+
+ @Test
+ def testJoin(): Unit = {
+ CoGroupJoinITCase.testResults = mutable.MutableList()
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ env.getConfig.enableTimestamps
+
+ val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
+ ctx.collect(("a", "x", 0))
+ ctx.collect(("a", "y", 1))
+ ctx.collect(("a", "z", 2))
+
+ ctx.collect(("b", "u", 3))
+ ctx.collect(("b", "w", 5))
+
+ ctx.collect(("a", "i", 6))
+ ctx.collect(("a", "j", 7))
+ ctx.collect(("a", "k", 8))
+ }
+
+ def cancel() {
+ }
+ }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+
+ val source2 = env.addSource(new SourceFunction[(String, String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
+ ctx.collect(("a", "u", 0))
+ ctx.collect(("a", "w", 1))
+
+ ctx.collect(("b", "i", 3))
+ ctx.collect(("b", "k", 5))
+
+ ctx.collect(("a", "x", 6))
+ ctx.collect(("a", "z", 8))
+ }
+
+ def cancel() {
+ }
+ }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+
+ source1.join(source2)
+ .where(_._1)
+ .equalTo(_._1)
+ .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply( (l, r) => l.toString + ":" + r.toString)
+ .addSink(new SinkFunction[String]() {
+ def invoke(value: String) {
+ CoGroupJoinITCase.testResults += value
+ }
+ })
+
+ env.execute("Join Test")
+
+ val expectedResult = mutable.MutableList(
+ "(a,x,0):(a,u,0)",
+ "(a,x,0):(a,w,1)",
+ "(a,y,1):(a,u,0)",
+ "(a,y,1):(a,w,1)",
+ "(a,z,2):(a,u,0)",
+ "(a,z,2):(a,w,1)",
+ "(b,u,3):(b,i,3)",
+ "(b,u,3):(b,k,5)",
+ "(b,w,5):(b,i,3)",
+ "(b,w,5):(b,k,5)",
+ "(a,i,6):(a,x,6)",
+ "(a,i,6):(a,z,8)",
+ "(a,j,7):(a,x,6)",
+ "(a,j,7):(a,z,8)",
+ "(a,k,8):(a,x,6)",
+ "(a,k,8):(a,z,8)")
+
+ assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
+ }
+
+ @Test
+ def testSelfJoin(): Unit = {
+ CoGroupJoinITCase.testResults = mutable.MutableList()
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ env.getConfig.enableTimestamps
+
+ val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
+ ctx.collect(("a", "x", 0))
+ ctx.collect(("a", "y", 1))
+ ctx.collect(("a", "z", 2))
+
+ ctx.collect(("b", "u", 3))
+ ctx.collect(("b", "w", 5))
+
+ ctx.collect(("a", "i", 6))
+ ctx.collect(("a", "j", 7))
+ ctx.collect(("a", "k", 8))
+ }
+
+ def cancel() {
+ }
+ }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+
+ source1.join(source1)
+ .where(_._1)
+ .equalTo(_._1)
+ .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply( (l, r) => l.toString + ":" + r.toString)
+ .addSink(new SinkFunction[String]() {
+ def invoke(value: String) {
+ CoGroupJoinITCase.testResults += value
+ }
+ })
+
+ env.execute("Self-Join Test")
+
+ val expectedResult = mutable.MutableList(
+ "(a,x,0):(a,x,0)",
+ "(a,x,0):(a,y,1)",
+ "(a,x,0):(a,z,2)",
+ "(a,y,1):(a,x,0)",
+ "(a,y,1):(a,y,1)",
+ "(a,y,1):(a,z,2)",
+ "(a,z,2):(a,x,0)",
+ "(a,z,2):(a,y,1)",
+ "(a,z,2):(a,z,2)",
+ "(b,u,3):(b,u,3)",
+ "(b,u,3):(b,w,5)",
+ "(b,w,5):(b,u,3)",
+ "(b,w,5):(b,w,5)",
+ "(a,i,6):(a,i,6)",
+ "(a,i,6):(a,j,7)",
+ "(a,i,6):(a,k,8)",
+ "(a,j,7):(a,i,6)",
+ "(a,j,7):(a,j,7)",
+ "(a,j,7):(a,k,8)",
+ "(a,k,8):(a,i,6)",
+ "(a,k,8):(a,j,7)",
+ "(a,k,8):(a,k,8)")
+
+ assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
+ }
+
+}
+
+
+object CoGroupJoinITCase {
+ private var testResults: mutable.MutableList[String] = null
+
+ private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] {
+ def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = {
+ element._2
+ }
+
+ def emitWatermark(element: (String, Int), currentTimestamp: Long): Long = {
+ element._2 - 1
+ }
+
+ def getCurrentWatermark: Long = {
+ Long.MinValue
+ }
+ }
+
+ private class Tuple3TimestampExtractor extends TimestampExtractor[(String, String, Int)] {
+ def extractTimestamp(element: (String, String, Int), currentTimestamp: Long): Long = {
+ element._3
+ }
+
+ def emitWatermark(element: (String, String, Int), currentTimestamp: Long): Long = {
+ element._3 - 1
+ }
+
+ def getCurrentWatermark: Long = {
+ Long.MinValue
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index d1fd233..c6bd87a 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
import java.lang.reflect.Method
import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, JoinedStreams}
import scala.language.existentials
@@ -138,14 +138,14 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
classOf[KeyedStream[_, _]])
checkMethods(
- "StreamJoinOperator", "StreamJoinOperator",
- classOf[org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator[_,_]],
- classOf[StreamJoinOperator[_,_]])
+ "JoinedStreams.WithWindow", "JoinedStreams.WithWindow",
+ classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]],
+ classOf[JoinedStreams.WithWindow[_,_,_,_]])
checkMethods(
- "TemporalOperator", "TemporalOperator",
- classOf[org.apache.flink.streaming.api.datastream.temporal.TemporalOperator[_,_,_]],
- classOf[TemporalOperator[_,_,_]])
+ "CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
+ classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
+ classOf[CoGroupedStreams.WithWindow[_,_,_,_]])
checkMethods(
"WindowedDataStream", "WindowedDataStream",