You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/04 23:22:44 UTC

[2/2] git commit: [FLINK-1174] [streaming] Added window join operator

[FLINK-1174] [streaming] Added window join operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/58f7d303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/58f7d303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/58f7d303

Branch: refs/heads/master
Commit: 58f7d303866fad6af4b23d2695d6273ef74b6a1d
Parents: 233161b
Author: ghermann <re...@gmail.com>
Authored: Mon Oct 27 19:04:50 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Tue Nov 4 22:38:15 2014 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     |  43 ++++--
 .../streaming/api/datastream/DataStream.java    |  54 ++++++-
 .../api/function/co/JoinWindowFunction.java     |  55 +++++++
 .../streaming/api/WindowCrossJoinTest.java      | 152 +++++++++++++++++++
 4 files changed, 294 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index fafb169..33dbc7e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.function.co.CoWindowFunction;
 import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
 import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
@@ -43,9 +44,9 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.serialization.ClassTypeWrapper;
 import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
@@ -504,6 +505,31 @@ public class ConnectedDataStream<IN1, IN2> {
 	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowCross(long windowSize,
 			long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
 
+		return addGeneralWindowJoin(new CrossWindowFunction<IN1, IN2>(), windowSize, slideInterval,
+				timestamp1, timestamp2);
+	}
+
+	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+			int fieldIn1, int fieldIn2) {
+
+		return windowJoin(windowSize, slideInterval, new DefaultTimeStamp<IN1>(),
+				new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+	}
+
+	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+
+		JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
+				dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
+
+		return addGeneralWindowJoin(joinWindowFunction, windowSize, slideInterval, timestamp1,
+				timestamp2);
+	}
+
+	private SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
+			CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
+			long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+
 		if (windowSize < 1) {
 			throw new IllegalArgumentException("Window size must be positive");
 		}
@@ -514,16 +540,17 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeWrapper<IN1> in1TypeWrapper = null;
 		TypeWrapper<IN2> in2TypeWrapper = null;
 
-		in1TypeWrapper = new ClassTypeWrapper<IN1>(dataStream1.getOutputType().getTypeClass());
-		in2TypeWrapper = new ClassTypeWrapper<IN2>(dataStream2.getOutputType().getTypeClass());
+		in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType().createSerializer()
+				.createInstance());
+		in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType().createSerializer()
+				.createInstance());
 
 		CombineTypeWrapper<IN1, IN2> outTypeWrapper = new CombineTypeWrapper<IN1, IN2>(
 				in1TypeWrapper, in2TypeWrapper);
 
-		return addCoFunction("coWindowReduce", new CrossWindowFunction<IN1, IN2>(), in1TypeWrapper,
-				in2TypeWrapper, outTypeWrapper, new CoWindowInvokable<IN1, IN2, Tuple2<IN1, IN2>>(
-						new CrossWindowFunction<IN1, IN2>(), windowSize, slideInterval, timestamp1,
-						timestamp2));
+		return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
+				outTypeWrapper, new CoWindowInvokable<IN1, IN2, Tuple2<IN1, IN2>>(coWindowFunction,
+						windowSize, slideInterval, timestamp1, timestamp2));
 	}
 
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
@@ -547,8 +574,6 @@ public class ConnectedDataStream<IN1, IN2> {
 		dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
 		dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
 
-		// TODO consider iteration
-
 		return returnStream;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 98058df..c777003 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -318,7 +318,7 @@ public class DataStream<OUT> {
 	 * Creates a cross (Cartesian product) of a data stream window.
 	 * 
 	 * @param dataStreamToCross
-	 *            {@link DataStream} to cross with
+	 *            {@link DataStream} to cross with.
 	 * @param windowSize
 	 *            Size of the windows that will be aligned for both streams in
 	 *            milliseconds.
@@ -339,6 +339,58 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Creates a join of a data stream based on the given positions.
+	 * 
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, int fieldIn1,
+			int fieldIn2) {
+		return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
+				new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+	}
+
+	/**
+	 * Creates a join of a data stream based on the given positions.
+	 * 
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param timestamp1
+	 *            User defined time stamps for the first input.
+	 * @param timestamp2
+	 *            User defined time stamps for the second input.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
+			TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+		return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
+				timestamp2, fieldIn1, fieldIn2);
+	}
+
+	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
 	 * are partitioned by the hashcodes of the selected fields.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
new file mode 100644
index 0000000..3dea04d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.util.Collector;
+
+public class JoinWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> {
+	private static final long serialVersionUID = 1L;
+
+	private KeySelector<IN1, Object> keySelector1;
+	private KeySelector<IN2, Object> keySelector2;
+
+	public JoinWindowFunction() {
+	}
+
+	public JoinWindowFunction(TypeInformation<IN1> inType1, TypeInformation<IN2> inType2,
+			int positionIn1, int positionIn2) {
+		keySelector1 = new FieldsKeySelector<IN1>(inType1, positionIn1);
+		keySelector2 = new FieldsKeySelector<IN2>(inType2, positionIn2);
+	}
+
+	@Override
+	public void coWindow(List<IN1> first, List<IN2> second, Collector<Tuple2<IN1, IN2>> out)
+			throws Exception {
+		for (IN1 item1 : first) {
+			for (IN2 item2 : second) {
+				if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) {
+					out.collect(new Tuple2<IN1, IN2>(item1, item2));
+				}
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
new file mode 100644
index 0000000..3a9eedd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.junit.Test;
+
+public class WindowCrossJoinTest {
+	private static final long MEMORYSIZE = 32;
+
+	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+
+	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+
+	@Test
+	public void test() throws Exception {
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		env.setBufferTimeout(1);
+
+		ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
+		ArrayList<Integer> in2 = new ArrayList<Integer>();
+
+		in1.add(new Tuple2<Integer, String>(10, "a"));
+		in1.add(new Tuple2<Integer, String>(20, "b"));
+		in1.add(new Tuple2<Integer, String>(20, "x"));
+		in1.add(new Tuple2<Integer, String>(0, "y"));
+
+		in2.add(0);
+		in2.add(5);
+		in2.add(20);
+
+		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "b"), 20));
+		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "x"), 20));
+		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(0, "y"), 0));
+
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(10, "a"), 0));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(10, "a"), 5));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(10, "a"), 20));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "b"), 0));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "b"), 5));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "b"), 20));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "x"), 0));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "x"), 5));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "x"), 20));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(0, "y"), 0));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(0, "y"), 5));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(0, "y"), 20));
+
+		DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1);
+		DataStream<Integer> inStream2 = env.fromCollection(in2);
+
+		inStream1.windowJoin(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2(), 0, 0)
+				.addSink(new JoinResultSink());
+
+		inStream1.windowCross(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2())
+			.addSink(new CrossResultSink());
+
+		env.executeTest(MEMORYSIZE);
+		
+		assertEquals(joinExpectedResults, joinResults);
+		assertEquals(crossExpectedResults, crossResults);
+	}
+
+	private static class MyTimestamp1 implements TimeStamp<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long getTimestamp(Tuple2<Integer, String> value) {
+			return 101L;
+		}
+
+		@Override
+		public long getStartTime() {
+			return 100L;
+		}
+	}
+
+	private static class MyTimestamp2 implements TimeStamp<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long getTimestamp(Integer value) {
+			return 101L;
+		}
+
+		@Override
+		public long getStartTime() {
+			return 100L;
+		}
+	}
+
+	private static class JoinResultSink implements
+			SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple2<Tuple2<Integer, String>, Integer> value) {
+			joinResults.add(value);
+		}
+	}
+
+	private static class CrossResultSink implements
+			SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple2<Tuple2<Integer, String>, Integer> value) {
+			crossResults.add(value);
+		}
+	}
+}