You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/04 23:22:43 UTC

[1/2] git commit: [streaming] Enhanced BufferTimeout functionality for StreamRecordWriter

Repository: incubator-flink
Updated Branches:
  refs/heads/master 233161b25 -> 5e9a45491


[streaming] Enhanced BufferTimeout functionality for StreamRecordWriter


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5e9a4549
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5e9a4549
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5e9a4549

Branch: refs/heads/master
Commit: 5e9a4549161c45c85434347d6a2f256953e9883b
Parents: 58f7d30
Author: mbalassi <ba...@gmail.com>
Authored: Tue Nov 4 12:14:33 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Tue Nov 4 22:38:15 2014 +0100

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 31 +++++++++++++-------
 .../api/streamvertex/OutputHandler.java         | 26 +++++++++++-----
 .../flink/streaming/io/StreamRecordWriter.java  | 14 +++++++--
 3 files changed, 50 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5e9a4549/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4d34217..4b7afbb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -59,7 +59,7 @@ public abstract class StreamExecutionEnvironment {
 
 	private int executionParallelism = -1;
 
-	private long buffertimeout = 0;;
+	private long bufferTimeout = 100;
 
 	protected JobGraphBuilder jobGraphBuilder;
 
@@ -112,23 +112,34 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Sets the maximum time frequency (ms) for the flushing of the output
-	 * buffers. By default the output buffers flush only when they are full.
+	 * Sets the maximum time frequency (milliseconds) for the flushing of the
+	 * output buffers. By default the output buffers flush frequently to provide
+	 * low latency and to aid smooth developer experience. Setting the parameter
+	 * can result in three logical modes:
+	 * 
+	 * <ul>
+	 * <li>
+	 * A positive integer triggers flushing periodically by that integer</li>
+	 * <li>
+	 * 0 triggers flushing after every record thus minimizing latency</li>
+	 * <li>
+	 * -1 triggers flushing only when the output buffer is full thus maximizing throughput</li>
+	 * </ul>
 	 * 
 	 * @param timeoutMillis
 	 *            The maximum time between two output flushes.
 	 */
 	public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
-		if (timeoutMillis < 0) {
-			throw new IllegalArgumentException("Timeout of buffer must be non-negative");
+		if (timeoutMillis < -1 ) {
+			throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
 		}
 
-		this.buffertimeout = timeoutMillis;
+		this.bufferTimeout = timeoutMillis;
 		return this;
 	}
 
 	public long getBufferTimeout() {
-		return this.buffertimeout;
+		return this.bufferTimeout;
 	}
 
 	/**
@@ -465,10 +476,10 @@ public abstract class StreamExecutionEnvironment {
 	 * the program that have resulted in a "sink" operation. Sink operations are
 	 * for example printing results or forwarding them to a message queue.
 	 * <p>
-	 * The program execution will be logged and displayed with the provided
-	 * name
+	 * The program execution will be logged and displayed with the provided name
 	 * 
-	 * @param jobName Desired name of the job
+	 * @param jobName
+	 *            Desired name of the job
 	 * 
 	 * @throws Exception
 	 **/

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5e9a4549/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index d3f75dd..8b72195 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 public class OutputHandler<OUT> {
 	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
 
-	private StreamVertex<?,OUT> streamVertex;
+	private StreamVertex<?, OUT> streamVertex;
 	private StreamConfig configuration;
 
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
@@ -50,7 +50,7 @@ public class OutputHandler<OUT> {
 	StreamRecordSerializer<OUT> outSerializer = null;
 	SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
 
-	public OutputHandler(StreamVertex<?,OUT> streamComponent) {
+	public OutputHandler(StreamVertex<?, OUT> streamComponent) {
 		this.streamVertex = streamComponent;
 		this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
 		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
@@ -119,12 +119,22 @@ public class OutputHandler<OUT> {
 
 		RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
 
-		if (bufferTimeout > 0) {
-			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(
-					streamVertex, outputPartitioner, bufferTimeout);
+		if (bufferTimeout >= 0) {
+			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
+					outputPartitioner, bufferTimeout);
+
+			if (LOG.isTraceEnabled()) {
+				LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}",
+						bufferTimeout, streamVertex.getClass().getSimpleName());
+			}
+
 		} else {
 			output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
 					outputPartitioner);
+
+			if (LOG.isTraceEnabled()) {
+				LOG.trace("RecordWriter initiated for {}", streamVertex.getClass().getSimpleName());
+			}
 		}
 
 		outputs.add(output);
@@ -136,8 +146,8 @@ public class OutputHandler<OUT> {
 		}
 
 		if (LOG.isTraceEnabled()) {
-			LOG.trace("Partitioner set: {} with {} outputs", outputPartitioner.getClass()
-					.getSimpleName(), outputNumber);
+			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
+					.getSimpleName(), outputNumber, streamVertex.getClass().getSimpleName());
 		}
 	}
 
@@ -155,7 +165,7 @@ public class OutputHandler<OUT> {
 
 	long startTime;
 
-	public void invokeUserFunction(String componentTypeName, StreamInvokable<?,OUT> userInvokable)
+	public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable)
 			throws IOException, InterruptedException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("{} {} invoked with instance id {}", componentTypeName,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5e9a4549/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
index 87fd7cd..1237020 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -46,12 +46,12 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
 	// -----------------------------------------------------------------------------------------------------------------
 
 	public StreamRecordWriter(AbstractInvokable invokable) {
-		this(invokable, new RoundRobinChannelSelector<T>(), 1000);
+		this(invokable, new RoundRobinChannelSelector<T>(), 100);
 	}
 
 	public StreamRecordWriter(AbstractInvokable invokable,
 			ChannelSelector<T> channelSelector) {
-		this(invokable, channelSelector, 1000);
+		this(invokable, channelSelector, 100);
 	}
 
 	public StreamRecordWriter(AbstractInvokable invokable,
@@ -73,8 +73,12 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
 		this.serializers = new RecordSerializer[numChannels];
 		for (int i = 0; i < this.numChannels; i++) {
 			this.serializers[i] = new SpanningRecordSerializer<T>();
+		}
+		
+		//start a separate thread to handle positive flush intervals
+		if (timeout > 0) {
+			(new OutputFlusher()).start();
 		}
-		(new OutputFlusher()).start();
 	}
 
 	@Override
@@ -99,6 +103,10 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
 									.getBufferSize());
 					result = serializer.setNextBuffer(buffer);
 				}
+			}
+			
+			if (timeout == 0){
+				flush();
 			}
 		}
 	}


[2/2] git commit: [FLINK-1174] [streaming] Added window join operator

Posted by mb...@apache.org.
[FLINK-1174] [streaming] Added window join operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/58f7d303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/58f7d303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/58f7d303

Branch: refs/heads/master
Commit: 58f7d303866fad6af4b23d2695d6273ef74b6a1d
Parents: 233161b
Author: ghermann <re...@gmail.com>
Authored: Mon Oct 27 19:04:50 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Tue Nov 4 22:38:15 2014 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     |  43 ++++--
 .../streaming/api/datastream/DataStream.java    |  54 ++++++-
 .../api/function/co/JoinWindowFunction.java     |  55 +++++++
 .../streaming/api/WindowCrossJoinTest.java      | 152 +++++++++++++++++++
 4 files changed, 294 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index fafb169..33dbc7e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.function.co.CoWindowFunction;
 import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
 import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
@@ -43,9 +44,9 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.serialization.ClassTypeWrapper;
 import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
@@ -504,6 +505,31 @@ public class ConnectedDataStream<IN1, IN2> {
 	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowCross(long windowSize,
 			long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
 
+		return addGeneralWindowJoin(new CrossWindowFunction<IN1, IN2>(), windowSize, slideInterval,
+				timestamp1, timestamp2);
+	}
+
+	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+			int fieldIn1, int fieldIn2) {
+
+		return windowJoin(windowSize, slideInterval, new DefaultTimeStamp<IN1>(),
+				new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+	}
+
+	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+
+		JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
+				dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
+
+		return addGeneralWindowJoin(joinWindowFunction, windowSize, slideInterval, timestamp1,
+				timestamp2);
+	}
+
+	private SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
+			CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
+			long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+
 		if (windowSize < 1) {
 			throw new IllegalArgumentException("Window size must be positive");
 		}
@@ -514,16 +540,17 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeWrapper<IN1> in1TypeWrapper = null;
 		TypeWrapper<IN2> in2TypeWrapper = null;
 
-		in1TypeWrapper = new ClassTypeWrapper<IN1>(dataStream1.getOutputType().getTypeClass());
-		in2TypeWrapper = new ClassTypeWrapper<IN2>(dataStream2.getOutputType().getTypeClass());
+		in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType().createSerializer()
+				.createInstance());
+		in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType().createSerializer()
+				.createInstance());
 
 		CombineTypeWrapper<IN1, IN2> outTypeWrapper = new CombineTypeWrapper<IN1, IN2>(
 				in1TypeWrapper, in2TypeWrapper);
 
-		return addCoFunction("coWindowReduce", new CrossWindowFunction<IN1, IN2>(), in1TypeWrapper,
-				in2TypeWrapper, outTypeWrapper, new CoWindowInvokable<IN1, IN2, Tuple2<IN1, IN2>>(
-						new CrossWindowFunction<IN1, IN2>(), windowSize, slideInterval, timestamp1,
-						timestamp2));
+		return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
+				outTypeWrapper, new CoWindowInvokable<IN1, IN2, Tuple2<IN1, IN2>>(coWindowFunction,
+						windowSize, slideInterval, timestamp1, timestamp2));
 	}
 
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
@@ -547,8 +574,6 @@ public class ConnectedDataStream<IN1, IN2> {
 		dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
 		dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
 
-		// TODO consider iteration
-
 		return returnStream;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 98058df..c777003 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -318,7 +318,7 @@ public class DataStream<OUT> {
 	 * Creates a cross (Cartesian product) of a data stream window.
 	 * 
 	 * @param dataStreamToCross
-	 *            {@link DataStream} to cross with
+	 *            {@link DataStream} to cross with.
 	 * @param windowSize
 	 *            Size of the windows that will be aligned for both streams in
 	 *            milliseconds.
@@ -339,6 +339,58 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Creates a join of a data stream based on the given positions.
+	 * 
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, int fieldIn1,
+			int fieldIn2) {
+		return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
+				new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+	}
+
+	/**
+	 * Creates a join of a data stream based on the given positions.
+	 * 
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param timestamp1
+	 *            User defined time stamps for the first input.
+	 * @param timestamp2
+	 *            User defined time stamps for the second input.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
+			TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+		return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
+				timestamp2, fieldIn1, fieldIn2);
+	}
+
+	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
 	 * are partitioned by the hashcodes of the selected fields.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
new file mode 100644
index 0000000..3dea04d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.util.Collector;
+
+public class JoinWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> {
+	private static final long serialVersionUID = 1L;
+
+	private KeySelector<IN1, Object> keySelector1;
+	private KeySelector<IN2, Object> keySelector2;
+
+	public JoinWindowFunction() {
+	}
+
+	public JoinWindowFunction(TypeInformation<IN1> inType1, TypeInformation<IN2> inType2,
+			int positionIn1, int positionIn2) {
+		keySelector1 = new FieldsKeySelector<IN1>(inType1, positionIn1);
+		keySelector2 = new FieldsKeySelector<IN2>(inType2, positionIn2);
+	}
+
+	@Override
+	public void coWindow(List<IN1> first, List<IN2> second, Collector<Tuple2<IN1, IN2>> out)
+			throws Exception {
+		for (IN1 item1 : first) {
+			for (IN2 item2 : second) {
+				if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) {
+					out.collect(new Tuple2<IN1, IN2>(item1, item2));
+				}
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
new file mode 100644
index 0000000..3a9eedd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.junit.Test;
+
+public class WindowCrossJoinTest {
+	private static final long MEMORYSIZE = 32;
+
+	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+
+	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+
+	@Test
+	public void test() throws Exception {
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		env.setBufferTimeout(1);
+
+		ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
+		ArrayList<Integer> in2 = new ArrayList<Integer>();
+
+		in1.add(new Tuple2<Integer, String>(10, "a"));
+		in1.add(new Tuple2<Integer, String>(20, "b"));
+		in1.add(new Tuple2<Integer, String>(20, "x"));
+		in1.add(new Tuple2<Integer, String>(0, "y"));
+
+		in2.add(0);
+		in2.add(5);
+		in2.add(20);
+
+		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "b"), 20));
+		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "x"), 20));
+		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(0, "y"), 0));
+
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(10, "a"), 0));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(10, "a"), 5));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(10, "a"), 20));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "b"), 0));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "b"), 5));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "b"), 20));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "x"), 0));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "x"), 5));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(20, "x"), 20));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(0, "y"), 0));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(0, "y"), 5));
+		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+				new Tuple2<Integer, String>(0, "y"), 20));
+
+		DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1);
+		DataStream<Integer> inStream2 = env.fromCollection(in2);
+
+		inStream1.windowJoin(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2(), 0, 0)
+				.addSink(new JoinResultSink());
+
+		inStream1.windowCross(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2())
+			.addSink(new CrossResultSink());
+
+		env.executeTest(MEMORYSIZE);
+		
+		assertEquals(joinExpectedResults, joinResults);
+		assertEquals(crossExpectedResults, crossResults);
+	}
+
+	private static class MyTimestamp1 implements TimeStamp<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long getTimestamp(Tuple2<Integer, String> value) {
+			return 101L;
+		}
+
+		@Override
+		public long getStartTime() {
+			return 100L;
+		}
+	}
+
+	private static class MyTimestamp2 implements TimeStamp<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long getTimestamp(Integer value) {
+			return 101L;
+		}
+
+		@Override
+		public long getStartTime() {
+			return 100L;
+		}
+	}
+
+	private static class JoinResultSink implements
+			SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple2<Tuple2<Integer, String>, Integer> value) {
+			joinResults.add(value);
+		}
+	}
+
+	private static class CrossResultSink implements
+			SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple2<Tuple2<Integer, String>, Integer> value) {
+			crossResults.add(value);
+		}
+	}
+}