You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/04 23:22:43 UTC
[1/2] git commit: [streaming] Enhanced BufferTimeout functionality
for StreamRecordWriter
Repository: incubator-flink
Updated Branches:
refs/heads/master 233161b25 -> 5e9a45491
[streaming] Enhanced BufferTimeout functionality for StreamRecordWriter
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5e9a4549
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5e9a4549
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5e9a4549
Branch: refs/heads/master
Commit: 5e9a4549161c45c85434347d6a2f256953e9883b
Parents: 58f7d30
Author: mbalassi <ba...@gmail.com>
Authored: Tue Nov 4 12:14:33 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Tue Nov 4 22:38:15 2014 +0100
----------------------------------------------------------------------
.../environment/StreamExecutionEnvironment.java | 31 +++++++++++++-------
.../api/streamvertex/OutputHandler.java | 26 +++++++++++-----
.../flink/streaming/io/StreamRecordWriter.java | 14 +++++++--
3 files changed, 50 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5e9a4549/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4d34217..4b7afbb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -59,7 +59,7 @@ public abstract class StreamExecutionEnvironment {
private int executionParallelism = -1;
- private long buffertimeout = 0;;
+ private long bufferTimeout = 100;
protected JobGraphBuilder jobGraphBuilder;
@@ -112,23 +112,34 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Sets the maximum time frequency (ms) for the flushing of the output
- * buffers. By default the output buffers flush only when they are full.
+ * Sets the maximum time frequency (milliseconds) for the flushing of the
+ * output buffers. By default the output buffers flush frequently to provide
+ * low latency and to aid smooth developer experience. Setting the parameter
+ * can result in three logical modes:
+ *
+ * <ul>
+ * <li>
+ * A positive integer triggers flushing periodically by that integer</li>
+ * <li>
+ * 0 triggers flushing after every record thus minimizing latency</li>
+ * <li>
+ * -1 triggers flushing only when the output buffer is full thus maximizing throughput</li>
+ * </ul>
*
* @param timeoutMillis
* The maximum time between two output flushes.
*/
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
- if (timeoutMillis < 0) {
- throw new IllegalArgumentException("Timeout of buffer must be non-negative");
+ if (timeoutMillis < -1 ) {
+ throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
}
- this.buffertimeout = timeoutMillis;
+ this.bufferTimeout = timeoutMillis;
return this;
}
public long getBufferTimeout() {
- return this.buffertimeout;
+ return this.bufferTimeout;
}
/**
@@ -465,10 +476,10 @@ public abstract class StreamExecutionEnvironment {
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
* <p>
- * The program execution will be logged and displayed with the provided
- * name
+ * The program execution will be logged and displayed with the provided name
*
- * @param jobName Desired name of the job
+ * @param jobName
+ * Desired name of the job
*
* @throws Exception
**/
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5e9a4549/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index d3f75dd..8b72195 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
public class OutputHandler<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
- private StreamVertex<?,OUT> streamVertex;
+ private StreamVertex<?, OUT> streamVertex;
private StreamConfig configuration;
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
@@ -50,7 +50,7 @@ public class OutputHandler<OUT> {
StreamRecordSerializer<OUT> outSerializer = null;
SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
- public OutputHandler(StreamVertex<?,OUT> streamComponent) {
+ public OutputHandler(StreamVertex<?, OUT> streamComponent) {
this.streamVertex = streamComponent;
this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
@@ -119,12 +119,22 @@ public class OutputHandler<OUT> {
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
- if (bufferTimeout > 0) {
- output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(
- streamVertex, outputPartitioner, bufferTimeout);
+ if (bufferTimeout >= 0) {
+ output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
+ outputPartitioner, bufferTimeout);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}",
+ bufferTimeout, streamVertex.getClass().getSimpleName());
+ }
+
} else {
output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
outputPartitioner);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RecordWriter initiated for {}", streamVertex.getClass().getSimpleName());
+ }
}
outputs.add(output);
@@ -136,8 +146,8 @@ public class OutputHandler<OUT> {
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Partitioner set: {} with {} outputs", outputPartitioner.getClass()
- .getSimpleName(), outputNumber);
+ LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
+ .getSimpleName(), outputNumber, streamVertex.getClass().getSimpleName());
}
}
@@ -155,7 +165,7 @@ public class OutputHandler<OUT> {
long startTime;
- public void invokeUserFunction(String componentTypeName, StreamInvokable<?,OUT> userInvokable)
+ public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable)
throws IOException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} invoked with instance id {}", componentTypeName,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5e9a4549/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
index 87fd7cd..1237020 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -46,12 +46,12 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
// -----------------------------------------------------------------------------------------------------------------
public StreamRecordWriter(AbstractInvokable invokable) {
- this(invokable, new RoundRobinChannelSelector<T>(), 1000);
+ this(invokable, new RoundRobinChannelSelector<T>(), 100);
}
public StreamRecordWriter(AbstractInvokable invokable,
ChannelSelector<T> channelSelector) {
- this(invokable, channelSelector, 1000);
+ this(invokable, channelSelector, 100);
}
public StreamRecordWriter(AbstractInvokable invokable,
@@ -73,8 +73,12 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
this.serializers = new RecordSerializer[numChannels];
for (int i = 0; i < this.numChannels; i++) {
this.serializers[i] = new SpanningRecordSerializer<T>();
+ }
+
+ //start a separate thread to handle positive flush intervals
+ if (timeout > 0) {
+ (new OutputFlusher()).start();
}
- (new OutputFlusher()).start();
}
@Override
@@ -99,6 +103,10 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
.getBufferSize());
result = serializer.setNextBuffer(buffer);
}
+ }
+
+ if (timeout == 0){
+ flush();
}
}
}
[2/2] git commit: [FLINK-1174] [streaming] Added window join operator
Posted by mb...@apache.org.
[FLINK-1174] [streaming] Added window join operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/58f7d303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/58f7d303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/58f7d303
Branch: refs/heads/master
Commit: 58f7d303866fad6af4b23d2695d6273ef74b6a1d
Parents: 233161b
Author: ghermann <re...@gmail.com>
Authored: Mon Oct 27 19:04:50 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Tue Nov 4 22:38:15 2014 +0100
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 43 ++++--
.../streaming/api/datastream/DataStream.java | 54 ++++++-
.../api/function/co/JoinWindowFunction.java | 55 +++++++
.../streaming/api/WindowCrossJoinTest.java | 152 +++++++++++++++++++
4 files changed, 294 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index fafb169..33dbc7e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
@@ -43,9 +44,9 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.serialization.ClassTypeWrapper;
import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeWrapper;
/**
@@ -504,6 +505,31 @@ public class ConnectedDataStream<IN1, IN2> {
SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowCross(long windowSize,
long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+ return addGeneralWindowJoin(new CrossWindowFunction<IN1, IN2>(), windowSize, slideInterval,
+ timestamp1, timestamp2);
+ }
+
+ SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+ int fieldIn1, int fieldIn2) {
+
+ return windowJoin(windowSize, slideInterval, new DefaultTimeStamp<IN1>(),
+ new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+ }
+
+ SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+ TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+
+ JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
+ dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
+
+ return addGeneralWindowJoin(joinWindowFunction, windowSize, slideInterval, timestamp1,
+ timestamp2);
+ }
+
+ private SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
+ CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
+ long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
@@ -514,16 +540,17 @@ public class ConnectedDataStream<IN1, IN2> {
TypeWrapper<IN1> in1TypeWrapper = null;
TypeWrapper<IN2> in2TypeWrapper = null;
- in1TypeWrapper = new ClassTypeWrapper<IN1>(dataStream1.getOutputType().getTypeClass());
- in2TypeWrapper = new ClassTypeWrapper<IN2>(dataStream2.getOutputType().getTypeClass());
+ in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType().createSerializer()
+ .createInstance());
+ in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType().createSerializer()
+ .createInstance());
CombineTypeWrapper<IN1, IN2> outTypeWrapper = new CombineTypeWrapper<IN1, IN2>(
in1TypeWrapper, in2TypeWrapper);
- return addCoFunction("coWindowReduce", new CrossWindowFunction<IN1, IN2>(), in1TypeWrapper,
- in2TypeWrapper, outTypeWrapper, new CoWindowInvokable<IN1, IN2, Tuple2<IN1, IN2>>(
- new CrossWindowFunction<IN1, IN2>(), windowSize, slideInterval, timestamp1,
- timestamp2));
+ return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
+ outTypeWrapper, new CoWindowInvokable<IN1, IN2, Tuple2<IN1, IN2>>(coWindowFunction,
+ windowSize, slideInterval, timestamp1, timestamp2));
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
@@ -547,8 +574,6 @@ public class ConnectedDataStream<IN1, IN2> {
dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
- // TODO consider iteration
-
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 98058df..c777003 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -318,7 +318,7 @@ public class DataStream<OUT> {
* Creates a cross (Cartesian product) of a data stream window.
*
* @param dataStreamToCross
- * {@link DataStream} to cross with
+ * {@link DataStream} to cross with.
* @param windowSize
* Size of the windows that will be aligned for both streams in
* milliseconds.
@@ -339,6 +339,58 @@ public class DataStream<OUT> {
}
/**
+ * Creates a join of a data stream based on the given positions.
+ *
+ * @param dataStreamToJoin
+ * {@link DataStream} to join with.
+ * @param windowSize
+ * Size of the windows that will be aligned for both streams in
+ * milliseconds.
+ * @param slideInterval
+ * After every function call the windows will be slid by this
+ * interval.
+ * @param fieldIn1
+ * The field in the first stream to be matched.
+ * @param fieldIn2
+ * The field in the second stream to be matched.
+ * @return The transformed {@link DataStream}.
+ */
+ public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+ DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, int fieldIn1,
+ int fieldIn2) {
+ return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
+ new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+ }
+
+ /**
+ * Creates a join of a data stream based on the given positions.
+ *
+ * @param dataStreamToJoin
+ * {@link DataStream} to join with.
+ * @param windowSize
+ * Size of the windows that will be aligned for both streams in
+ * milliseconds.
+ * @param slideInterval
+ * After every function call the windows will be slid by this
+ * interval.
+ * @param timestamp1
+ * User defined time stamps for the first input.
+ * @param timestamp2
+ * User defined time stamps for the second input.
+ * @param fieldIn1
+ * The field in the first stream to be matched.
+ * @param fieldIn2
+ * The field in the second stream to be matched.
+ * @return The transformed {@link DataStream}.
+ */
+ public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+ DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
+ TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+ return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
+ timestamp2, fieldIn1, fieldIn2);
+ }
+
+ /**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are partitioned by the hashcodes of the selected fields.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
new file mode 100644
index 0000000..3dea04d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.util.Collector;
+
+public class JoinWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> {
+ private static final long serialVersionUID = 1L;
+
+ private KeySelector<IN1, Object> keySelector1;
+ private KeySelector<IN2, Object> keySelector2;
+
+ public JoinWindowFunction() {
+ }
+
+ public JoinWindowFunction(TypeInformation<IN1> inType1, TypeInformation<IN2> inType2,
+ int positionIn1, int positionIn2) {
+ keySelector1 = new FieldsKeySelector<IN1>(inType1, positionIn1);
+ keySelector2 = new FieldsKeySelector<IN2>(inType2, positionIn2);
+ }
+
+ @Override
+ public void coWindow(List<IN1> first, List<IN2> second, Collector<Tuple2<IN1, IN2>> out)
+ throws Exception {
+ for (IN1 item1 : first) {
+ for (IN2 item2 : second) {
+ if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) {
+ out.collect(new Tuple2<IN1, IN2>(item1, item2));
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/58f7d303/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
new file mode 100644
index 0000000..3a9eedd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.junit.Test;
+
+public class WindowCrossJoinTest {
+ private static final long MEMORYSIZE = 32;
+
+ private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+ private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+
+ private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+ private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+
+ @Test
+ public void test() throws Exception {
+ LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ env.setBufferTimeout(1);
+
+ ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
+ ArrayList<Integer> in2 = new ArrayList<Integer>();
+
+ in1.add(new Tuple2<Integer, String>(10, "a"));
+ in1.add(new Tuple2<Integer, String>(20, "b"));
+ in1.add(new Tuple2<Integer, String>(20, "x"));
+ in1.add(new Tuple2<Integer, String>(0, "y"));
+
+ in2.add(0);
+ in2.add(5);
+ in2.add(20);
+
+ joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "b"), 20));
+ joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "x"), 20));
+ joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(0, "y"), 0));
+
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(10, "a"), 0));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(10, "a"), 5));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(10, "a"), 20));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "b"), 0));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "b"), 5));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "b"), 20));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "x"), 0));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "x"), 5));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(20, "x"), 20));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(0, "y"), 0));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(0, "y"), 5));
+ crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
+ new Tuple2<Integer, String>(0, "y"), 20));
+
+ DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1);
+ DataStream<Integer> inStream2 = env.fromCollection(in2);
+
+ inStream1.windowJoin(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2(), 0, 0)
+ .addSink(new JoinResultSink());
+
+ inStream1.windowCross(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2())
+ .addSink(new CrossResultSink());
+
+ env.executeTest(MEMORYSIZE);
+
+ assertEquals(joinExpectedResults, joinResults);
+ assertEquals(crossExpectedResults, crossResults);
+ }
+
+ private static class MyTimestamp1 implements TimeStamp<Tuple2<Integer, String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Tuple2<Integer, String> value) {
+ return 101L;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 100L;
+ }
+ }
+
+ private static class MyTimestamp2 implements TimeStamp<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return 101L;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 100L;
+ }
+ }
+
+ private static class JoinResultSink implements
+ SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple2<Tuple2<Integer, String>, Integer> value) {
+ joinResults.add(value);
+ }
+ }
+
+ private static class CrossResultSink implements
+ SinkFunction<Tuple2<Tuple2<Integer, String>, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple2<Tuple2<Integer, String>, Integer> value) {
+ crossResults.add(value);
+ }
+ }
+}