You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/04 23:22:44 UTC
[2/2] git commit: [FLINK-1174] [streaming] Added window join operator
[FLINK-1174] [streaming] Added window join operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/58f7d303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/58f7d303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/58f7d303
Branch: refs/heads/master
Commit: 58f7d303866fad6af4b23d2695d6273ef74b6a1d
Parents: 233161b
Author: ghermann <re...@gmail.com>
Authored: Mon Oct 27 19:04:50 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Tue Nov 4 22:38:15 2014 +0100
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 43 ++++--
.../streaming/api/datastream/DataStream.java | 54 ++++++-
.../api/function/co/JoinWindowFunction.java | 55 +++++++
.../streaming/api/WindowCrossJoinTest.java | 152 +++++++++++++++++++
4 files changed, 294 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index fafb169..33dbc7e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
@@ -43,9 +44,9 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.serialization.ClassTypeWrapper;
import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeWrapper;
/**
@@ -504,6 +505,31 @@ public class ConnectedDataStream<IN1, IN2> {
SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowCross(long windowSize,
long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+ return addGeneralWindowJoin(new CrossWindowFunction<IN1, IN2>(), windowSize, slideInterval,
+ timestamp1, timestamp2);
+ }
+
+ SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+ int fieldIn1, int fieldIn2) {
+
+ return windowJoin(windowSize, slideInterval, new DefaultTimeStamp<IN1>(),
+ new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+ }
+
+ SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+ TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+
+ JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
+ dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
+
+ return addGeneralWindowJoin(joinWindowFunction, windowSize, slideInterval, timestamp1,
+ timestamp2);
+ }
+
+ private SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
+ CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
+ long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
@@ -514,16 +540,17 @@ public class ConnectedDataStream<IN1, IN2> {
TypeWrapper<IN1> in1TypeWrapper = null;
TypeWrapper<IN2> in2TypeWrapper = null;
- in1TypeWrapper = new ClassTypeWrapper<IN1>(dataStream1.getOutputType().getTypeClass());
- in2TypeWrapper = new ClassTypeWrapper<IN2>(dataStream2.getOutputType().getTypeClass());
+ in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType().createSerializer()
+ .createInstance());
+ in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType().createSerializer()
+ .createInstance());
CombineTypeWrapper<IN1, IN2> outTypeWrapper = new CombineTypeWrapper<IN1, IN2>(
in1TypeWrapper, in2TypeWrapper);
- return addCoFunction("coWindowReduce", new CrossWindowFunction<IN1, IN2>(), in1TypeWrapper,
- in2TypeWrapper, outTypeWrapper, new CoWindowInvokable<IN1, IN2, Tuple2<IN1, IN2>>(
- new CrossWindowFunction<IN1, IN2>(), windowSize, slideInterval, timestamp1,
- timestamp2));
+ return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
+ outTypeWrapper, new CoWindowInvokable<IN1, IN2, Tuple2<IN1, IN2>>(coWindowFunction,
+ windowSize, slideInterval, timestamp1, timestamp2));
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
@@ -547,8 +574,6 @@ public class ConnectedDataStream<IN1, IN2> {
dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
- // TODO consider iteration
-
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 98058df..c777003 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -318,7 +318,7 @@ public class DataStream<OUT> {
* Creates a cross (Cartesian product) of a data stream window.
*
* @param dataStreamToCross
- * {@link DataStream} to cross with
+ * {@link DataStream} to cross with.
* @param windowSize
* Size of the windows that will be aligned for both streams in
* milliseconds.
@@ -339,6 +339,58 @@ public class DataStream<OUT> {
}
/**
+ * Creates a join of a data stream based on the given positions.
+ *
+ * @param dataStreamToJoin
+ * {@link DataStream} to join with.
+ * @param windowSize
+ * Size of the windows that will be aligned for both streams in
+ * milliseconds.
+ * @param slideInterval
+ * After every function call the windows will be slid by this
+ * interval.
+ * @param fieldIn1
+ * The field in the first stream to be matched.
+ * @param fieldIn2
+ * The field in the second stream to be matched.
+ * @return The transformed {@link DataStream}.
+ */
+ public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+ DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, int fieldIn1,
+ int fieldIn2) {
+ return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
+ new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+ }
+
+ /**
+ * Creates a join of a data stream based on the given positions.
+ *
+ * @param dataStreamToJoin
+ * {@link DataStream} to join with.
+ * @param windowSize
+ * Size of the windows that will be aligned for both streams in
+ * milliseconds.
+ * @param slideInterval
+ * After every function call the windows will be slid by this
+ * interval.
+ * @param timestamp1
+ * User defined time stamps for the first input.
+ * @param timestamp2
+ * User defined time stamps for the second input.
+ * @param fieldIn1
+ * The field in the first stream to be matched.
+ * @param fieldIn2
+ * The field in the second stream to be matched.
+ * @return The transformed {@link DataStream}.
+ */
+ public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+ DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
+ TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+ return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
+ timestamp2, fieldIn1, fieldIn2);
+ }
+
+ /**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are partitioned by the hashcodes of the selected fields.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
new file mode 100644
index 0000000..3dea04d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.util.Collector;
+
+public class JoinWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> {
+ private static final long serialVersionUID = 1L;
+
+ private KeySelector<IN1, Object> keySelector1;
+ private KeySelector<IN2, Object> keySelector2;
+
+ public JoinWindowFunction() {
+ }
+
+ public JoinWindowFunction(TypeInformation<IN1> inType1, TypeInformation<IN2> inType2,
+ int positionIn1, int positionIn2) {
+ keySelector1 = new FieldsKeySelector<IN1>(inType1, positionIn1);
+ keySelector2 = new FieldsKeySelector<IN2>(inType2, positionIn2);
+ }
+
+ @Override
+ public void coWindow(List<IN1> first, List<IN2> second, Collector<Tuple2<IN1, IN2>> out)
+ throws Exception {
+ for (IN1 item1 : first) {
+ for (IN2 item2 : second) {
+ if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) {
+ out.collect(new Tuple2<IN1, IN2>(item1, item2));
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
new file mode 100644
index 0000000..3a9eedd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.junit.Test;
+
+public class WindowCrossJoinTest {
+ private static final long MEMORYSIZE = 32;
+
+ private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+ private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+
+ private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+ private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+
+ @Test
+ public void test() throws Exception {
+ LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ env.setBufferTimeout(1);
+
+ ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
+ ArrayList<Integer> in2 = new ArrayList<Integer>();
+
+ in1.add(new Tuple2<Integer, String>(10, "a"));
+ in1.add(new Tuple2<Integer, String>(20, "b"));
+ in1.add(new Tuple2<Integer, String>(20, "x"));
+ in1.add(new Tuple2<Integer, String>(0, "y"));
+
+ in2.add(0);
+ in2.add(5);
+ in2.add(20);
+
+ joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "b"), 20));
+ joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "x"), 20));
+ joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(0, "y"), 0));
+
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(10, "a"), 0));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(10, "a"), 5));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(10, "a"), 20));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "b"), 0));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "b"), 5));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "b"), 20));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "x"), 0));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "x"), 5));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "x"), 20));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(0, "y"), 0));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(0, "y"), 5));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(0, "y"), 20));
+
+ DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1);
+ DataStream<Integer> inStream2 = env.fromCollection(in2);
+
+ inStream1.windowJoin(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2(), 0, 0)
+ .addSink(new JoinResultSink());
+
+ inStream1.windowCross(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2())
+ .addSink(new CrossResultSink());
+
+ env.executeTest(MEMORYSIZE);
+
+ assertEquals(joinExpectedResults, joinResults);
+ assertEquals(crossExpectedResults, crossResults);
+ }
+
+ private static class MyTimestamp1 implements TimeStamp<Tuple2<Integer, String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Tuple2<Integer, String> value) {
+ return 101L;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 100L;
+ }
+ }
+
+ private static class MyTimestamp2 implements TimeStamp<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return 101L;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 100L;
+ }
+ }
+
+ private static class JoinResultSink implements
+ SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple2<Tuple2<Integer, String>, Integer> value) {
+ joinResults.add(value);
+ }
+ }
+
+ private static class CrossResultSink implements
+ SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple2<Tuple2<Integer, String>, Integer> value) {
+ crossResults.add(value);
+ }
+ }
+}