You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:56 UTC
[7/8] flink git commit: Move CoGroupJoinITCase to windowing package
Move CoGroupJoinITCase to windowing package
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce792b11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce792b11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce792b11
Branch: refs/heads/master
Commit: ce792b11a4a2b8d7b02a59dcadcc4c052ff5531a
Parents: ff367d6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Oct 7 16:18:24 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:08:25 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/CoGroupJoinITCase.java | 372 ------------------
.../operators/windowing/CoGroupJoinITCase.java | 373 +++++++++++++++++++
2 files changed, 373 insertions(+), 372 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ce792b11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
deleted file mode 100644
index 9ddd6eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.api;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
-
- private static List<String> testResults;
-
- @Test
- public void testCoGroup() throws Exception {
-
- testResults = Lists.newArrayList();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(1);
-
- DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple2.of("a", 0));
- ctx.collect(Tuple2.of("a", 1));
- ctx.collect(Tuple2.of("a", 2));
-
- ctx.collect(Tuple2.of("b", 3));
- ctx.collect(Tuple2.of("b", 4));
- ctx.collect(Tuple2.of("b", 5));
-
- ctx.collect(Tuple2.of("a", 6));
- ctx.collect(Tuple2.of("a", 7));
- ctx.collect(Tuple2.of("a", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).extractTimestamp(new Tuple2TimestampExtractor());
-
- DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple2.of("a", 0));
- ctx.collect(Tuple2.of("a", 1));
-
- ctx.collect(Tuple2.of("b", 3));
-
- ctx.collect(Tuple2.of("c", 6));
- ctx.collect(Tuple2.of("c", 7));
- ctx.collect(Tuple2.of("c", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).extractTimestamp(new Tuple2TimestampExtractor());
-
-
- source1.coGroup(source2)
- .where(new Tuple2KeyExtractor())
- .equalTo(new Tuple2KeyExtractor())
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
- @Override
- public void coGroup(Iterable<Tuple2<String, Integer>> first,
- Iterable<Tuple2<String, Integer>> second,
- Collector<String> out) throws Exception {
- StringBuilder result = new StringBuilder();
- result.append("F:");
- for (Tuple2<String, Integer> t: first) {
- result.append(t.toString());
- }
- result.append(" S:");
- for (Tuple2<String, Integer> t: second) {
- result.append(t.toString());
- }
- out.collect(result.toString());
- }
- })
- .addSink(new SinkFunction<String>() {
- @Override
- public void invoke(String value) throws Exception {
- testResults.add(value);
- }
- });
-
- env.execute("CoGroup Test");
-
- List<String> expectedResult = Lists.newArrayList(
- "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
- "F:(b,3)(b,4)(b,5) S:(b,3)",
- "F:(a,6)(a,7)(a,8) S:",
- "F: S:(c,6)(c,7)(c,8)");
-
- Collections.sort(expectedResult);
- Collections.sort(testResults);
-
- Assert.assertEquals(expectedResult, testResults);
- }
-
- @Test
- public void testJoin() throws Exception {
-
- testResults = Lists.newArrayList();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(1);
-
- DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple3.of("a", "x", 0));
- ctx.collect(Tuple3.of("a", "y", 1));
- ctx.collect(Tuple3.of("a", "z", 2));
-
- ctx.collect(Tuple3.of("b", "u", 3));
- ctx.collect(Tuple3.of("b", "w", 5));
-
- ctx.collect(Tuple3.of("a", "i", 6));
- ctx.collect(Tuple3.of("a", "j", 7));
- ctx.collect(Tuple3.of("a", "k", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).extractTimestamp(new Tuple3TimestampExtractor());
-
- DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple3.of("a", "u", 0));
- ctx.collect(Tuple3.of("a", "w", 1));
-
- ctx.collect(Tuple3.of("b", "i", 3));
- ctx.collect(Tuple3.of("b", "k", 5));
-
- ctx.collect(Tuple3.of("a", "x", 6));
- ctx.collect(Tuple3.of("a", "z", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).extractTimestamp(new Tuple3TimestampExtractor());
-
-
- source1.join(source2)
- .where(new Tuple3KeyExtractor())
- .equalTo(new Tuple3KeyExtractor())
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
- @Override
- public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
- return first + ":" + second;
- }
- })
- .addSink(new SinkFunction<String>() {
- @Override
- public void invoke(String value) throws Exception {
- testResults.add(value);
- }
- });
-
- env.execute("Join Test");
-
- List<String> expectedResult = Lists.newArrayList(
- "(a,x,0):(a,u,0)",
- "(a,x,0):(a,w,1)",
- "(a,y,1):(a,u,0)",
- "(a,y,1):(a,w,1)",
- "(a,z,2):(a,u,0)",
- "(a,z,2):(a,w,1)",
- "(b,u,3):(b,i,3)",
- "(b,u,3):(b,k,5)",
- "(b,w,5):(b,i,3)",
- "(b,w,5):(b,k,5)",
- "(a,i,6):(a,x,6)",
- "(a,i,6):(a,z,8)",
- "(a,j,7):(a,x,6)",
- "(a,j,7):(a,z,8)",
- "(a,k,8):(a,x,6)",
- "(a,k,8):(a,z,8)");
-
- Collections.sort(expectedResult);
- Collections.sort(testResults);
-
- Assert.assertEquals(expectedResult, testResults);
- }
-
- @Test
- public void testSelfJoin() throws Exception {
-
- testResults = Lists.newArrayList();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(1);
-
- DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple3.of("a", "x", 0));
- ctx.collect(Tuple3.of("a", "y", 1));
- ctx.collect(Tuple3.of("a", "z", 2));
-
- ctx.collect(Tuple3.of("b", "u", 3));
- ctx.collect(Tuple3.of("b", "w", 5));
-
- ctx.collect(Tuple3.of("a", "i", 6));
- ctx.collect(Tuple3.of("a", "j", 7));
- ctx.collect(Tuple3.of("a", "k", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).extractTimestamp(new Tuple3TimestampExtractor());
-
- source1.join(source1)
- .where(new Tuple3KeyExtractor())
- .equalTo(new Tuple3KeyExtractor())
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
- @Override
- public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
- return first + ":" + second;
- }
- })
- .addSink(new SinkFunction<String>() {
- @Override
- public void invoke(String value) throws Exception {
- testResults.add(value);
- }
- });
-
- env.execute("Self-Join Test");
-
- List<String> expectedResult = Lists.newArrayList(
- "(a,x,0):(a,x,0)",
- "(a,x,0):(a,y,1)",
- "(a,x,0):(a,z,2)",
- "(a,y,1):(a,x,0)",
- "(a,y,1):(a,y,1)",
- "(a,y,1):(a,z,2)",
- "(a,z,2):(a,x,0)",
- "(a,z,2):(a,y,1)",
- "(a,z,2):(a,z,2)",
- "(b,u,3):(b,u,3)",
- "(b,u,3):(b,w,5)",
- "(b,w,5):(b,u,3)",
- "(b,w,5):(b,w,5)",
- "(a,i,6):(a,i,6)",
- "(a,i,6):(a,j,7)",
- "(a,i,6):(a,k,8)",
- "(a,j,7):(a,i,6)",
- "(a,j,7):(a,j,7)",
- "(a,j,7):(a,k,8)",
- "(a,k,8):(a,i,6)",
- "(a,k,8):(a,j,7)",
- "(a,k,8):(a,k,8)");
-
- Collections.sort(expectedResult);
- Collections.sort(testResults);
-
- Assert.assertEquals(expectedResult, testResults);
- }
-
- private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) {
- return element.f1;
- }
-
- @Override
- public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
- return element.f1 - 1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return Long.MIN_VALUE;
- }
- }
-
- private static class Tuple3TimestampExtractor implements TimestampExtractor<Tuple3<String, String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long extractTimestamp(Tuple3<String, String, Integer> element, long currentTimestamp) {
- return element.f2;
- }
-
- @Override
- public long emitWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
- return element.f2 - 1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return Long.MIN_VALUE;
- }
- }
-
- private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple2<String, Integer> value) throws Exception {
- return value.f0;
- }
- }
-
- private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple3<String, String, Integer> value) throws Exception {
- return value.f0;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce792b11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
new file mode 100644
index 0000000..bb79e5e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
@@ -0,0 +1,373 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+
+ private static List<String> testResults;
+
+ @Test
+ public void testCoGroup() throws Exception {
+
+ testResults = Lists.newArrayList();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple2.of("a", 0));
+ ctx.collect(Tuple2.of("a", 1));
+ ctx.collect(Tuple2.of("a", 2));
+
+ ctx.collect(Tuple2.of("b", 3));
+ ctx.collect(Tuple2.of("b", 4));
+ ctx.collect(Tuple2.of("b", 5));
+
+ ctx.collect(Tuple2.of("a", 6));
+ ctx.collect(Tuple2.of("a", 7));
+ ctx.collect(Tuple2.of("a", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple2TimestampExtractor());
+
+ DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple2.of("a", 0));
+ ctx.collect(Tuple2.of("a", 1));
+
+ ctx.collect(Tuple2.of("b", 3));
+
+ ctx.collect(Tuple2.of("c", 6));
+ ctx.collect(Tuple2.of("c", 7));
+ ctx.collect(Tuple2.of("c", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple2TimestampExtractor());
+
+
+ source1.coGroup(source2)
+ .where(new Tuple2KeyExtractor())
+ .equalTo(new Tuple2KeyExtractor())
+ .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
+ @Override
+ public void coGroup(Iterable<Tuple2<String, Integer>> first,
+ Iterable<Tuple2<String, Integer>> second,
+ Collector<String> out) throws Exception {
+ StringBuilder result = new StringBuilder();
+ result.append("F:");
+ for (Tuple2<String, Integer> t: first) {
+ result.append(t.toString());
+ }
+ result.append(" S:");
+ for (Tuple2<String, Integer> t: second) {
+ result.append(t.toString());
+ }
+ out.collect(result.toString());
+ }
+ })
+ .addSink(new SinkFunction<String>() {
+ @Override
+ public void invoke(String value) throws Exception {
+ testResults.add(value);
+ }
+ });
+
+ env.execute("CoGroup Test");
+
+ List<String> expectedResult = Lists.newArrayList(
+ "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
+ "F:(b,3)(b,4)(b,5) S:(b,3)",
+ "F:(a,6)(a,7)(a,8) S:",
+ "F: S:(c,6)(c,7)(c,8)");
+
+ Collections.sort(expectedResult);
+ Collections.sort(testResults);
+
+ Assert.assertEquals(expectedResult, testResults);
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+
+ testResults = Lists.newArrayList();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple3.of("a", "x", 0));
+ ctx.collect(Tuple3.of("a", "y", 1));
+ ctx.collect(Tuple3.of("a", "z", 2));
+
+ ctx.collect(Tuple3.of("b", "u", 3));
+ ctx.collect(Tuple3.of("b", "w", 5));
+
+ ctx.collect(Tuple3.of("a", "i", 6));
+ ctx.collect(Tuple3.of("a", "j", 7));
+ ctx.collect(Tuple3.of("a", "k", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple3TimestampExtractor());
+
+ DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple3.of("a", "u", 0));
+ ctx.collect(Tuple3.of("a", "w", 1));
+
+ ctx.collect(Tuple3.of("b", "i", 3));
+ ctx.collect(Tuple3.of("b", "k", 5));
+
+ ctx.collect(Tuple3.of("a", "x", 6));
+ ctx.collect(Tuple3.of("a", "z", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple3TimestampExtractor());
+
+
+ source1.join(source2)
+ .where(new Tuple3KeyExtractor())
+ .equalTo(new Tuple3KeyExtractor())
+ .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
+ @Override
+ public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
+ return first + ":" + second;
+ }
+ })
+ .addSink(new SinkFunction<String>() {
+ @Override
+ public void invoke(String value) throws Exception {
+ testResults.add(value);
+ }
+ });
+
+ env.execute("Join Test");
+
+ List<String> expectedResult = Lists.newArrayList(
+ "(a,x,0):(a,u,0)",
+ "(a,x,0):(a,w,1)",
+ "(a,y,1):(a,u,0)",
+ "(a,y,1):(a,w,1)",
+ "(a,z,2):(a,u,0)",
+ "(a,z,2):(a,w,1)",
+ "(b,u,3):(b,i,3)",
+ "(b,u,3):(b,k,5)",
+ "(b,w,5):(b,i,3)",
+ "(b,w,5):(b,k,5)",
+ "(a,i,6):(a,x,6)",
+ "(a,i,6):(a,z,8)",
+ "(a,j,7):(a,x,6)",
+ "(a,j,7):(a,z,8)",
+ "(a,k,8):(a,x,6)",
+ "(a,k,8):(a,z,8)");
+
+ Collections.sort(expectedResult);
+ Collections.sort(testResults);
+
+ Assert.assertEquals(expectedResult, testResults);
+ }
+
+ @Test
+ public void testSelfJoin() throws Exception {
+
+ testResults = Lists.newArrayList();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple3.of("a", "x", 0));
+ ctx.collect(Tuple3.of("a", "y", 1));
+ ctx.collect(Tuple3.of("a", "z", 2));
+
+ ctx.collect(Tuple3.of("b", "u", 3));
+ ctx.collect(Tuple3.of("b", "w", 5));
+
+ ctx.collect(Tuple3.of("a", "i", 6));
+ ctx.collect(Tuple3.of("a", "j", 7));
+ ctx.collect(Tuple3.of("a", "k", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple3TimestampExtractor());
+
+ source1.join(source1)
+ .where(new Tuple3KeyExtractor())
+ .equalTo(new Tuple3KeyExtractor())
+ .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
+ @Override
+ public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
+ return first + ":" + second;
+ }
+ })
+ .addSink(new SinkFunction<String>() {
+ @Override
+ public void invoke(String value) throws Exception {
+ testResults.add(value);
+ }
+ });
+
+ env.execute("Self-Join Test");
+
+ List<String> expectedResult = Lists.newArrayList(
+ "(a,x,0):(a,x,0)",
+ "(a,x,0):(a,y,1)",
+ "(a,x,0):(a,z,2)",
+ "(a,y,1):(a,x,0)",
+ "(a,y,1):(a,y,1)",
+ "(a,y,1):(a,z,2)",
+ "(a,z,2):(a,x,0)",
+ "(a,z,2):(a,y,1)",
+ "(a,z,2):(a,z,2)",
+ "(b,u,3):(b,u,3)",
+ "(b,u,3):(b,w,5)",
+ "(b,w,5):(b,u,3)",
+ "(b,w,5):(b,w,5)",
+ "(a,i,6):(a,i,6)",
+ "(a,i,6):(a,j,7)",
+ "(a,i,6):(a,k,8)",
+ "(a,j,7):(a,i,6)",
+ "(a,j,7):(a,j,7)",
+ "(a,j,7):(a,k,8)",
+ "(a,k,8):(a,i,6)",
+ "(a,k,8):(a,j,7)",
+ "(a,k,8):(a,k,8)");
+
+ Collections.sort(expectedResult);
+ Collections.sort(testResults);
+
+ Assert.assertEquals(expectedResult, testResults);
+ }
+
+ private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) {
+ return element.f1;
+ }
+
+ @Override
+ public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
+ return element.f1 - 1;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return Long.MIN_VALUE;
+ }
+ }
+
+ private static class Tuple3TimestampExtractor implements TimestampExtractor<Tuple3<String, String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long extractTimestamp(Tuple3<String, String, Integer> element, long currentTimestamp) {
+ return element.f2;
+ }
+
+ @Override
+ public long emitWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
+ return element.f2 - 1;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return Long.MIN_VALUE;
+ }
+ }
+
+ private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple2<String, Integer> value) throws Exception {
+ return value.f0;
+ }
+ }
+
+ private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple3<String, String, Integer> value) throws Exception {
+ return value.f0;
+ }
+ }
+
+}