You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:54 UTC
[5/8] flink git commit: [FLINK-2819] Add Windowed Join/CoGroup
Operator Based on Tagged Union
[FLINK-2819] Add Windowed Join/CoGroup Operator Based on Tagged Union
Right now, this does everything in memory, so the JVM will blow if data
for one key becomes too large.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8634dbbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8634dbbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8634dbbe
Branch: refs/heads/master
Commit: 8634dbbe998af53471bed2f3a6557c722bb37b87
Parents: 47b5cb7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 6 16:33:04 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:08:25 2015 +0200
----------------------------------------------------------------------
.../api/datastream/CoGroupedStreams.java | 563 +++++++++++++++++++
.../streaming/api/datastream/DataStream.java | 36 +-
.../streaming/api/datastream/JoinedStreams.java | 328 +++++++++++
.../datastream/temporal/StreamJoinOperator.java | 274 ---------
.../datastream/temporal/TemporalOperator.java | 124 ----
.../api/datastream/temporal/TemporalWindow.java | 45 --
.../windowing/ReduceWindowFunction.java | 32 +-
.../flink/streaming/api/CoGroupJoinITCase.java | 372 ++++++++++++
.../streaming/api/WindowCrossJoinTest.java | 143 -----
.../api/operators/co/SelfConnectionTest.java | 61 --
.../streaming/examples/join/WindowJoin.java | 128 +++--
.../examples/join/util/WindowJoinData.java | 70 +--
.../scala/examples/join/WindowJoin.scala | 43 +-
.../join/WindowJoinITCase.java | 101 ++--
.../join/WindowJoinITCase.java | 101 ++--
.../streaming/api/scala/CoGroupedStreams.scala | 294 ++++++++++
.../flink/streaming/api/scala/DataStream.scala | 24 +-
.../streaming/api/scala/JoinedStreams.scala | 303 ++++++++++
.../api/scala/StreamJoinOperator.scala | 203 -------
.../streaming/api/scala/TemporalOperator.scala | 51 --
.../streaming/api/scala/CoGroupJoinITCase.scala | 274 +++++++++
.../StreamingScalaAPICompletenessTest.scala | 14 +-
22 files changed, 2409 insertions(+), 1175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
new file mode 100644
index 0000000..e1f1a96
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped.
+ * A streaming co-group operation is evaluated over elements in a window.
+ *
+ * <p>
+ * To finalize co-group operation you also need to specify a {@link KeySelector} for
+ * both the first and second input and a {@link WindowAssigner}.
+ *
+ * <p>
+ * Note: Right now, the groups are being built in memory so you need to ensure that they don't
+ * get too big. Otherwise the JVM might crash.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> one = ...;
+ * DataStream<Tuple2<String, Integer>> two = ...;
+ *
+ * DataStream<T> result = one.coGroup(two)
+ * .where(new MyFirstKeySelector())
+ * .equalTo(new MyFirstKeySelector())
+ * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ * .apply(new MyCoGroupFunction());
+ * } </pre>
+ */
+public class CoGroupedStreams {
+
+ /**
+ * A co-group operation that does not yet have its {@link KeySelector KeySelectors} defined.
+ *
+ * @param <T1> Type of the elements from the first input
+ * @param <T2> Type of the elements from the second input
+ */
+ public static class Unspecified<T1, T2> {
+ DataStream<T1> input1;
+ DataStream<T2> input2;
+
+ protected Unspecified(DataStream<T1> input1,
+ DataStream<T2> input2) {
+ this.input1 = input1;
+ this.input2 = input2;
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the first input.
+ */
+ public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
+ return new WithKey<>(input1, input2, input1.clean(keySelector), null);
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the second input.
+ */
+ public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
+ return new WithKey<>(input1, input2, null, input1.clean(keySelector));
+ }
+ }
+
+ /**
+ * A co-group operation that has {@link KeySelector KeySelectors} defined for either both or
+ * one input.
+ *
+ * <p>
+ * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)}
+ * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a
+ * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
+ *
+ * @param <T1> Type of the elements from the first input
+ * @param <T2> Type of the elements from the second input
+ * @param <KEY> Type of the key. This must be the same for both inputs
+ */
+ public static class WithKey<T1, T2, KEY> {
+ DataStream<T1> input1;
+ DataStream<T2> input2;
+
+ KeySelector<T1, KEY> keySelector1;
+ KeySelector<T2, KEY> keySelector2;
+
+ protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
+ this.input1 = input1;
+ this.input2 = input2;
+
+ this.keySelector1 = keySelector1;
+ this.keySelector2 = keySelector2;
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the first input.
+ */
+ public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
+ return new CoGroupedStreams.WithKey<>(input1, input2, input1.clean(keySelector), keySelector2);
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the second input.
+ */
+ public CoGroupedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
+ return new CoGroupedStreams.WithKey<>(input1, input2, keySelector1, input1.clean(keySelector));
+ }
+
+ /**
+ * Specifies the window on which the co-group operation works.
+ */
+ public <W extends Window> CoGroupedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+ if (keySelector1 == null || keySelector2 == null) {
+ throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo().");
+
+ }
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null);
+ }
+ }
+
+ /**
+ * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
+ * well as a {@link WindowAssigner}.
+ *
+ * @param <T1> Type of the elements from the first input
+ * @param <T2> Type of the elements from the second input
+ * @param <KEY> Type of the key. This must be the same for both inputs
+ * @param <W> Type of {@link Window} on which the co-group operation works.
+ */
+ public static class WithWindow<T1, T2, KEY, W extends Window> {
+ private final DataStream<T1> input1;
+ private final DataStream<T2> input2;
+
+ private final KeySelector<T1, KEY> keySelector1;
+ private final KeySelector<T2, KEY> keySelector2;
+
+ private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
+
+ private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
+
+ private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
+
+ protected WithWindow(DataStream<T1> input1,
+ DataStream<T2> input2,
+ KeySelector<T1, KEY> keySelector1,
+ KeySelector<T2, KEY> keySelector2,
+ WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
+ Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
+ Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+ this.input1 = input1;
+ this.input2 = input2;
+
+ this.keySelector1 = keySelector1;
+ this.keySelector2 = keySelector2;
+
+ this.windowAssigner = windowAssigner;
+ this.trigger = trigger;
+ this.evictor = evictor;
+ }
+
+ /**
+ * Sets the {@code Trigger} that should be used to trigger window emission.
+ */
+ public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor);
+ }
+
+ /**
+ * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+ *
+ * <p>
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor);
+ }
+
+ /**
+ * Completes the co-group operation with the user function that is executed
+ * for windowed groups.
+ */
+ public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
+
+ TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+ function,
+ CoGroupFunction.class,
+ true,
+ true,
+ input1.getType(),
+ input2.getType(),
+ "CoGroup",
+ false);
+
+ return apply(function, resultType);
+ }
+
+ /**
+ * Completes the co-group operation with the user function that is executed
+ * for windowed groups.
+ */
+ public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+ //clean the closure
+ function = input1.getExecutionEnvironment().clean(function);
+
+ DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
+ .map(new Input1Tagger<T1, T2>())
+ .returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
+ DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
+ .map(new Input2Tagger<T1, T2>())
+ .returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
+
+ WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = taggedInput1
+ .union(taggedInput2)
+ .keyBy(new UnionKeySelector<>(keySelector1, keySelector2))
+ .window(windowAssigner);
+
+ if (trigger != null) {
+ windowOp.trigger(trigger);
+ }
+ if (evictor != null) {
+ windowOp.evictor(evictor);
+ }
+
+ return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
+ }
+ }
+
+ /**
+ * Creates a new co-group operation from the two given inputs.
+ */
+ public static <T1, T2> Unspecified<T1, T2> createCoGroup(DataStream<T1> input1, DataStream<T2> input2) {
+ return new Unspecified<>(input1, input2);
+ }
+
+ /**
+ * Internal class for implementing tagged union co-group.
+ */
+ public static class TaggedUnion<T1, T2> {
+ private final T1 one;
+ private final T2 two;
+
+ private TaggedUnion(T1 one, T2 two) {
+ this.one = one;
+ this.two = two;
+ }
+
+ public boolean isOne() {
+ return one != null;
+ }
+
+ public boolean isTwo() {
+ return two != null;
+ }
+
+ public T1 getOne() {
+ return one;
+ }
+
+ public T2 getTwo() {
+ return two;
+ }
+
+ public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
+ return new TaggedUnion<>(one, null);
+ }
+
+ public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
+ return new TaggedUnion<>(null, two);
+ }
+ }
+
+ private static class UnionTypeInfo<T1, T2> extends TypeInformation<TaggedUnion<T1, T2>> {
+ private static final long serialVersionUID = 1L;
+
+ TypeInformation<T1> oneType;
+ TypeInformation<T2> twoType;
+
+ public UnionTypeInfo(TypeInformation<T1> oneType,
+ TypeInformation<T2> twoType) {
+ this.oneType = oneType;
+ this.twoType = twoType;
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 2;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 2;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked, rawtypes")
+ public Class<TaggedUnion<T1, T2>> getTypeClass() {
+ return (Class) TaggedUnion.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return true;
+ }
+
+ @Override
+ public TypeSerializer<TaggedUnion<T1, T2>> createSerializer(ExecutionConfig config) {
+ return new UnionSerializer<>(oneType.createSerializer(config), twoType.createSerializer(config));
+ }
+
+ @Override
+ public String toString() {
+ return "TaggedUnion<" + oneType + ", " + twoType + ">";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof UnionTypeInfo) {
+ @SuppressWarnings("unchecked")
+ UnionTypeInfo<T1, T2> unionTypeInfo = (UnionTypeInfo<T1, T2>) obj;
+
+ return unionTypeInfo.canEqual(this) && oneType.equals(unionTypeInfo.oneType) && twoType.equals(unionTypeInfo.twoType);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * oneType.hashCode() + twoType.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof UnionTypeInfo;
+ }
+ }
+
+ private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<T1> oneSerializer;
+ private final TypeSerializer<T2> twoSerializer;
+
+ public UnionSerializer(TypeSerializer<T1> oneSerializer,
+ TypeSerializer<T2> twoSerializer) {
+ this.oneSerializer = oneSerializer;
+ this.twoSerializer = twoSerializer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<TaggedUnion<T1, T2>> duplicate() {
+ return this;
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> createInstance() {
+ return null;
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from) {
+ if (from.isOne()) {
+ return TaggedUnion.one(oneSerializer.copy(from.getOne()));
+ } else {
+ return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+ }
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from, TaggedUnion<T1, T2> reuse) {
+ if (from.isOne()) {
+ return TaggedUnion.one(oneSerializer.copy(from.getOne()));
+ } else {
+ return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+ } }
+
+ @Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(TaggedUnion<T1, T2> record, DataOutputView target) throws IOException {
+ if (record.isOne()) {
+ target.writeByte(1);
+ oneSerializer.serialize(record.getOne(), target);
+ } else {
+ target.writeByte(2);
+ twoSerializer.serialize(record.getTwo(), target);
+ }
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> deserialize(DataInputView source) throws IOException {
+ byte tag = source.readByte();
+ if (tag == 1) {
+ return TaggedUnion.one(oneSerializer.deserialize(source));
+ } else {
+ return TaggedUnion.two(twoSerializer.deserialize(source));
+ }
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> deserialize(TaggedUnion<T1, T2> reuse,
+ DataInputView source) throws IOException {
+ byte tag = source.readByte();
+ if (tag == 1) {
+ return TaggedUnion.one(oneSerializer.deserialize(source));
+ } else {
+ return TaggedUnion.two(twoSerializer.deserialize(source));
+ }
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ byte tag = source.readByte();
+ target.writeByte(tag);
+ if (tag == 1) {
+ oneSerializer.copy(source, target);
+ } else {
+ twoSerializer.copy(source, target);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * oneSerializer.hashCode() + twoSerializer.hashCode();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean equals(Object obj) {
+ if (obj instanceof UnionSerializer) {
+ UnionSerializer<T1, T2> other = (UnionSerializer<T1, T2>) obj;
+
+ return other.canEqual(this) && oneSerializer.equals(other.oneSerializer) && twoSerializer.equals(other.twoSerializer);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof UnionSerializer;
+ }
+ }
+
+ private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TaggedUnion<T1, T2> map(T1 value) throws Exception {
+ return TaggedUnion.one(value);
+ }
+ }
+
+ private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TaggedUnion<T1, T2> map(T2 value) throws Exception {
+ return TaggedUnion.two(value);
+ }
+ }
+
+ private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> {
+ private static final long serialVersionUID = 1L;
+
+ private final KeySelector<T1, KEY> keySelector1;
+ private final KeySelector<T2, KEY> keySelector2;
+
+ public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
+ KeySelector<T2, KEY> keySelector2) {
+ this.keySelector1 = keySelector1;
+ this.keySelector2 = keySelector2;
+ }
+
+ @Override
+ public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
+ if (value.isOne()) {
+ return keySelector1.getKey(value.getOne());
+ } else {
+ return keySelector2.getKey(value.getTwo());
+ }
+ }
+ }
+
+ private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
+ extends WrappingFunction<CoGroupFunction<T1, T2, T>>
+ implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
+ private static final long serialVersionUID = 1L;
+
+ public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void apply(KEY key,
+ W window,
+ Iterable<TaggedUnion<T1, T2>> values,
+ Collector<T> out) throws Exception {
+ List<T1> oneValues = Lists.newArrayList();
+ List<T2> twoValues = Lists.newArrayList();
+ for (TaggedUnion<T1, T2> val: values) {
+ if (val.isOne()) {
+ oneValues.add(val.getOne());
+ } else {
+ twoValues.add(val.getTwo());
+ }
+ }
+ wrappedFunction.coGroup(oneValues, twoValues, out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8de1a0d..0be1d56 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,7 +44,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
@@ -618,30 +617,19 @@ public class DataStream<T> {
}
/**
- * Initiates a temporal Join transformation. <br/>
- * A temporal Join transformation joins the elements of two
- * {@link DataStream}s on key equality over a specified time window.
- *
- * <p>
- * This method returns a {@link StreamJoinOperator} on which the
- * {@link StreamJoinOperator#onWindow(long, java.util.concurrent.TimeUnit)}
- * should be called to define the window, and then the
- * {@link StreamJoinOperator.JoinWindow#where(int...)} and
- * {@link StreamJoinOperator.JoinPredicate#equalTo(int...)} can be used to define
- * the join keys.
- * <p>
- * The user can also use the
- * {@link org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator.JoinPredicate.JoinedStream#with}
- * to apply a custom join function.
- *
- * @param dataStreamToJoin
- * The other DataStream with which this DataStream is joined.
- * @return A {@link StreamJoinOperator} to continue the definition of the
- * Join transformation.
- *
+ * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys
+ * and window can be specified.
+ */
+ public <T2> CoGroupedStreams.Unspecified<T, T2> coGroup(DataStream<T2> otherStream) {
+ return CoGroupedStreams.createCoGroup(this, otherStream);
+ }
+
+ /**
+ * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
+ * and window can be specified.
*/
- public <IN2> StreamJoinOperator<T, IN2> join(DataStream<IN2> dataStreamToJoin) {
- return new StreamJoinOperator<T, IN2>(this, dataStreamToJoin);
+ public <T2> JoinedStreams.Unspecified<T, T2> join(DataStream<T2> otherStream) {
+ return JoinedStreams.createJoin(this, otherStream);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
new file mode 100644
index 0000000..ee848e3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ *{@code JoinedStreams} represents two {@link DataStream DataStreams} that have been joined.
+ * A streaming join operation is evaluated over elements in a window.
+ *
+ * <p>
+ * To finalize the join operation you also need to specify a {@link KeySelector} for
+ * both the first and second input and a {@link WindowAssigner}.
+ *
+ * <p>
+ * Note: Right now, the the join is being evaluated in memory so you need to ensure that the number
+ * of elements per key does not get too high. Otherwise the JVM might crash.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> one = ...;
+ * DataStream<Tuple2<String, Integer>> twp = ...;
+ *
+ * DataStream<T> result = one.join(two)
+ * .where(new MyFirstKeySelector())
+ * .equalTo(new MyFirstKeySelector())
+ * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ * .apply(new MyJoinFunction());
+ * } </pre>
+ */
+public class JoinedStreams extends CoGroupedStreams{
+
+ /**
+ * A join operation that does not yet have its {@link KeySelector KeySelectors} defined.
+ *
+ * @param <T1> Type of the elements from the first input
+ * @param <T2> Type of the elements from the second input
+ */
+ public static class Unspecified<T1, T2> {
+ DataStream<T1> input1;
+ DataStream<T2> input2;
+
+ protected Unspecified(DataStream<T1> input1,
+ DataStream<T2> input2) {
+ this.input1 = input1;
+ this.input2 = input2;
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the first input.
+ */
+ public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
+ return new WithKey<>(input1, input2, keySelector, null);
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the second input.
+ */
+ public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
+ return new WithKey<>(input1, input2, null, keySelector);
+ }
+ }
+
+ /**
+ * A join operation that has {@link KeySelector KeySelectors} defined for either both or
+ * one input.
+ *
+ * <p>
+ * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)}
+ * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a
+ * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
+ *
+ * @param <T1> Type of the elements from the first input
+ * @param <T2> Type of the elements from the second input
+ * @param <KEY> Type of the key. This must be the same for both inputs
+ */
+ public static class WithKey<T1, T2, KEY> {
+ DataStream<T1> input1;
+ DataStream<T2> input2;
+
+ KeySelector<T1, KEY> keySelector1;
+ KeySelector<T2, KEY> keySelector2;
+
+ protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
+ this.input1 = input1;
+ this.input2 = input2;
+
+ this.keySelector1 = keySelector1;
+ this.keySelector2 = keySelector2;
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the first input.
+ */
+ public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
+ return new JoinedStreams.WithKey<>(input1, input2, keySelector, keySelector2);
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the second input.
+ */
+ public JoinedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
+ return new JoinedStreams.WithKey<>(input1, input2, keySelector1, keySelector);
+ }
+
+ /**
+ * Specifies the window on which the join operation works.
+ */
+ public <W extends Window> JoinedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+ if (keySelector1 == null || keySelector2 == null) {
+ throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo().");
+
+ }
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null);
+ }
+ }
+
+ /**
+ * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as
+ * well as a {@link WindowAssigner}.
+ *
+ * @param <T1> Type of the elements from the first input
+ * @param <T2> Type of the elements from the second input
+ * @param <KEY> Type of the key. This must be the same for both inputs
+ * @param <W> Type of {@link Window} on which the join operation works.
+ */
+ public static class WithWindow<T1, T2, KEY, W extends Window> {
+ private final DataStream<T1> input1;
+ private final DataStream<T2> input2;
+
+ private final KeySelector<T1, KEY> keySelector1;
+ private final KeySelector<T2, KEY> keySelector2;
+
+ private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
+
+ private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
+
+ private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
+
+ protected WithWindow(DataStream<T1> input1,
+ DataStream<T2> input2,
+ KeySelector<T1, KEY> keySelector1,
+ KeySelector<T2, KEY> keySelector2,
+ WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
+ Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
+ Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+ this.input1 = input1;
+ this.input2 = input2;
+
+ this.keySelector1 = keySelector1;
+ this.keySelector2 = keySelector2;
+
+ this.windowAssigner = windowAssigner;
+ this.trigger = trigger;
+ this.evictor = evictor;
+ }
+
+ /**
+ * Sets the {@code Trigger} that should be used to trigger window emission.
+ */
+ public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor);
+ }
+
+ /**
+ * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+ *
+ * <p>
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor);
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for each combination of elements with the same key in a window.
+ */
+ public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
+ TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+ function,
+ JoinFunction.class,
+ true,
+ true,
+ input1.getType(),
+ input2.getType(),
+ "CoGroup",
+ false);
+
+ return apply(function, resultType);
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for each combination of elements with the same key in a window.
+ */
+ public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+ //clean the closure
+ function = input1.getExecutionEnvironment().clean(function);
+
+ return input1.coGroup(input2)
+ .where(keySelector1)
+ .equalTo(keySelector2)
+ .window(windowAssigner)
+ .trigger(trigger)
+ .evictor(evictor)
+ .apply(new FlatJoinCoGroupFunction<>(function), resultType);
+
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for each combination of elements with the same key in a window.
+ */
+ public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
+ TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+ function,
+ JoinFunction.class,
+ true,
+ true,
+ input1.getType(),
+ input2.getType(),
+ "CoGroup",
+ false);
+
+ return apply(function, resultType);
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for each combination of elements with the same key in a window.
+ */
+ public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+ //clean the closure
+ function = input1.getExecutionEnvironment().clean(function);
+
+ return input1.coGroup(input2)
+ .where(keySelector1)
+ .equalTo(keySelector2)
+ .window(windowAssigner)
+ .trigger(trigger)
+ .evictor(evictor)
+ .apply(new JoinCoGroupFunction<>(function), resultType);
+
+ }
+ }
+
+ /**
+ * Creates a new join operation from the two given inputs.
+ */
+ public static <T1, T2> Unspecified<T1, T2> createJoin(DataStream<T1> input1, DataStream<T2> input2) {
+ return new Unspecified<>(input1, input2);
+ }
+
+ /**
+ * CoGroup function that does a nested-loop join to get the join result.
+ */
+ private static class JoinCoGroupFunction<T1, T2, T>
+ extends WrappingFunction<JoinFunction<T1, T2, T>>
+ implements CoGroupFunction<T1, T2, T> {
+ private static final long serialVersionUID = 1L;
+
+ public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
+ super(wrappedFunction);
+ }
+
+ @Override
+ public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
+ for (T1 val1: first) {
+ for (T2 val2: second) {
+ out.collect(wrappedFunction.join(val1, val2));
+ }
+ }
+ }
+ }
+
+ /**
+ * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
+ */
+ private static class FlatJoinCoGroupFunction<T1, T2, T>
+ extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
+ implements CoGroupFunction<T1, T2, T> {
+ private static final long serialVersionUID = 1L;
+
+ public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
+ super(wrappedFunction);
+ }
+
+ @Override
+ public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
+ for (T1 val1: first) {
+ for (T2 val2: second) {
+ wrappedFunction.join(val1, val2, out);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
deleted file mode 100644
index 4a5622d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-
-public class StreamJoinOperator<I1, I2> extends
- TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> {
-
- public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) {
- super(input1, input2);
- }
-
- @Override
- protected JoinWindow<I1, I2> createNextWindowOperator() {
- return new JoinWindow<I1, I2>(this);
- }
-
- public static class JoinWindow<I1, I2> implements TemporalWindow<JoinWindow<I1, I2>> {
-
- private StreamJoinOperator<I1, I2> op;
- private TypeInformation<I1> type1;
-
- private JoinWindow(StreamJoinOperator<I1, I2> operator) {
- this.op = operator;
- this.type1 = op.input1.getType();
- }
-
- /**
- * Continues a temporal Join transformation. <br/>
- * Defines the {@link Tuple} fields of the first join {@link DataStream}
- * that should be used as join keys.<br/>
- * <b>Note: Fields can only be selected as join keys on Tuple
- * DataStreams.</b><br/>
- *
- * @param fields
- * The indexes of the other Tuple fields of the first join
- * DataStreams that should be used as keys.
- * @return An incomplete Join transformation. Call
- * {@link JoinPredicate#equalTo} to continue the Join.
- */
- public JoinPredicate<I1, I2> where(int... fields) {
- return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<I1>(fields, type1), type1, op.input1.getExecutionEnvironment().getConfig()));
- }
-
- /**
- * Continues a temporal join transformation. <br/>
- * Defines the fields of the first join {@link DataStream} that should
- * be used as grouping keys. Fields are the names of member fields of
- * the underlying type of the data stream.
- *
- * @param fields
- * The fields of the first join DataStream that should be
- * used as keys.
- * @return An incomplete Join transformation. Call
- * {@link JoinPredicate#equalTo} to continue the Join.
- */
- public JoinPredicate<I1, I2> where(String... fields) {
- return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<I1>(fields, type1), type1, op.input1.getExecutionEnvironment().getConfig()));
- }
-
- /**
- * Continues a temporal Join transformation and defines a
- * {@link KeySelector} function for the first join {@link DataStream}
- * .</br> The KeySelector function is called for each element of the
- * first DataStream and extracts a single key value on which the
- * DataStream is joined. </br>
- *
- * @param keySelector
- * The KeySelector function which extracts the key values
- * from the DataStream on which it is joined.
- * @return An incomplete Join transformation. Call
- * {@link JoinPredicate#equalTo} to continue the Join.
- */
- public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) {
- return new JoinPredicate<I1, I2>(op, keySelector);
- }
-
- @Override
- public JoinWindow<I1, I2> every(long length, TimeUnit timeUnit) {
- return every(timeUnit.toMillis(length));
- }
-
- @Override
- public JoinWindow<I1, I2> every(long length) {
- op.slideInterval = length;
- return this;
- }
-
- // ----------------------------------------------------------------------------------------
-
- }
-
- /**
- * Intermediate step of a temporal Join transformation. <br/>
- * To continue the Join transformation, select the join key of the second
- * input {@link DataStream} by calling {@link JoinPredicate#equalTo}
- *
- */
- public static class JoinPredicate<I1, I2> {
-
- private StreamJoinOperator<I1, I2> op;
- private KeySelector<I1, ?> keys1;
- private KeySelector<I2, ?> keys2;
- private TypeInformation<I2> type2;
-
- private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) {
- this.op = operator;
- this.keys1 = keys1;
- this.type2 = op.input2.getType();
- }
-
- /**
- * Creates a temporal Join transformation and defines the {@link Tuple}
- * fields of the second join {@link DataStream} that should be used as
- * join keys.<br/>
- * </p> The resulting operator wraps each pair of joining elements in a
- * Tuple2<I1,I2>(first, second). To use a different wrapping function
- * use {@link JoinedStream#with(JoinFunction)}
- *
- * @param fields
- * The indexes of the Tuple fields of the second join
- * DataStream that should be used as keys.
- * @return A streaming join operator. Call {@link JoinedStream#with} to
- * apply a custom wrapping
- */
- public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(int... fields) {
- keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2),
- type2, op.input1.getExecutionEnvironment().getConfig());
- return createJoinOperator();
- }
-
- /**
- * Creates a temporal Join transformation and defines the fields of the
- * second join {@link DataStream} that should be used as join keys. </p>
- * The resulting operator wraps each pair of joining elements in a
- * Tuple2<I1,I2>(first, second). To use a different wrapping function
- * use {@link JoinedStream#with(JoinFunction)}
- *
- * @param fields
- * The fields of the second join DataStream that should be
- * used as keys.
- * @return A streaming join operator. Call {@link JoinedStream#with} to
- * apply a custom wrapping
- */
- public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(String... fields) {
- this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields,
- type2), type2, op.input1.getExecutionEnvironment().getConfig());
- return createJoinOperator();
- }
-
- /**
- * Creates a temporal Join transformation and defines a
- * {@link KeySelector} function for the second join {@link DataStream}
- * .</br> The KeySelector function is called for each element of the
- * second DataStream and extracts a single key value on which the
- * DataStream is joined. </p> The resulting operator wraps each pair of
- * joining elements in a Tuple2<I1,I2>(first, second). To use a
- * different wrapping function use
- * {@link JoinedStream#with(JoinFunction)}
- *
- *
- * @param keySelector
- * The KeySelector function which extracts the key values
- * from the second DataStream on which it is joined.
- * @return A streaming join operator. Call {@link JoinedStream#with} to
- * apply a custom wrapping
- */
- public <K> JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(KeySelector<I2, K> keySelector) {
- this.keys2 = keySelector;
- return createJoinOperator();
- }
-
- private JoinedStream<I1, I2, Tuple2<I1, I2>> createJoinOperator() {
-
-// JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>();
-//
-// JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction(
-// joinFunction, this);
-//
-// TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>(
-// op.input1.getType(), op.input2.getType());
-
-// return new JoinedStream<I1, I2, Tuple2<I1, I2>>(this, op.input1
-// .keyBy(keys1)
-// .connect(op.input2.keyBy(keys2))
-// .addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
-// op.slideInterval, op.timeStamp1, op.timeStamp2));
- return null;
- }
-
- public static class JoinedStream<I1, I2, R> extends
- SingleOutputStreamOperator<R, JoinedStream<I1, I2, R>> {
- private final JoinPredicate<I1, I2> predicate;
-
- private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<R> ds) {
- super(ds.getExecutionEnvironment(), ds.getTransformation());
- this.predicate = predicate;
- }
-
- /**
- * Completes a stream join. </p> The resulting operator wraps each pair
- * of joining elements using the user defined {@link JoinFunction}
- *
- * @return The joined data stream.
- */
- @SuppressWarnings("unchecked")
- public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
-
- TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
- predicate.op.input1.getType(), predicate.op.input2.getType());
-
-// JoinWindowFunction<I1, I2, OUT> joinWindowFunction = getJoinWindowFunction(joinFunction, predicate);
-//
-
-// return new JoinedStream<I1, I2, OUT>(
-// predicate, predicate.op.input1
-// .keyBy(predicate.keys1)
-// .connect(predicate.op.input2.keyBy(predicate.keys2))
-// .addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize,
-// predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2));
- return null;
- }
- }
- }
-
- public static final class DefaultJoinFunction<I1, I2> implements
- JoinFunction<I1, I2, Tuple2<I1, I2>> {
-
- private static final long serialVersionUID = 1L;
- private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
-
- @Override
- public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception {
- outTuple.f0 = first;
- outTuple.f1 = second;
- return outTuple;
- }
- }
-
-// private static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
-// JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
-// return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
-// }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
deleted file mode 100644
index 9da00f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
-
- public final DataStream<I1> input1;
- public final DataStream<I2> input2;
-
- public long windowSize;
- public long slideInterval;
-
- public TimestampWrapper<I1> timeStamp1;
- public TimestampWrapper<I2> timeStamp2;
-
- public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
- if (input1 == null || input2 == null) {
- throw new NullPointerException();
- }
- this.input1 = input1;
- this.input2 = input2;
- }
-
- /**
- * Continues a temporal transformation.<br/>
- * Defines the window size on which the two DataStreams will be transformed.
- * To define sliding windows call {@link TemporalWindow#every} on the
- * resulting operator.
- *
- * @param length
- * The size of the window in milliseconds.
- * @param timeUnit
- * The unit if time to be used
- * @return An incomplete temporal transformation.
- */
- @SuppressWarnings("unchecked")
- public OP onWindow(long length, TimeUnit timeUnit) {
- return onWindow(timeUnit.toMillis(length),
- (TimestampWrapper<I1>) SystemTimestamp.getWrapper(),
- (TimestampWrapper<I2>) SystemTimestamp.getWrapper());
- }
-
- /**
- * Continues a temporal transformation.<br/>
- * Defines the window size on which the two DataStreams will be
- * transformed.To define sliding windows call {@link TemporalWindow#every}
- * on the resulting operator.
- *
- * @param length
- * The size of the window in milliseconds.
- * @param timeStamp1
- * The timestamp used to extract time from the elements of the
- * first data stream.
- * @param timeStamp2
- * The timestamp used to extract time from the elements of the
- * second data stream.
- * @return An incomplete temporal transformation.
- */
- public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2) {
- return onWindow(length, timeStamp1, timeStamp2, 0);
- }
-
- /**
- * Continues a temporal transformation.<br/>
- * Defines the window size on which the two DataStreams will be
- * transformed.To define sliding windows call {@link TemporalWindow#every}
- * on the resulting operator.
- *
- * @param length
- * The size of the window in milliseconds.
- * @param timeStamp1
- * The timestamp used to extract time from the elements of the
- * first data stream.
- * @param timeStamp2
- * The timestamp used to extract time from the elements of the
- * second data stream.
- * @param startTime
- * The start time to measure the first window
- * @return An incomplete temporal transformation.
- */
- public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2,
- long startTime) {
- return onWindow(length, new TimestampWrapper<I1>(timeStamp1, startTime),
- new TimestampWrapper<I2>(timeStamp2, startTime));
- }
-
- private OP onWindow(long length, TimestampWrapper<I1> timeStamp1,
- TimestampWrapper<I2> timeStamp2) {
-
- this.windowSize = length;
- this.slideInterval = length;
-
- this.timeStamp1 = timeStamp1;
- this.timeStamp2 = timeStamp2;
-
- return createNextWindowOperator();
- }
-
- protected abstract OP createNextWindowOperator();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
deleted file mode 100644
index 49c75c4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-public interface TemporalWindow<T> {
-
- /**
- * Defines the slide interval for this temporal operator
- *
- * @param length
- * Length of the window
- * @param timeUnit
- * Unit of time
- * @return The temporal operator with slide interval specified
- */
- public T every(long length, TimeUnit timeUnit);
-
- /**
- * Defines the slide interval for this temporal operator
- *
- * @param length
- * Length of the window
- * @return The temporal operator with slide interval specified
- */
- public T every(long length);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
index 042fe18..edd8a34 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -18,37 +18,17 @@
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
-public class ReduceWindowFunction<K, W extends Window, T> extends RichWindowFunction<T, T, K, W> {
+public class ReduceWindowFunction<K, W extends Window, T>
+ extends WrappingFunction<ReduceFunction<T>>
+ implements WindowFunction<T, T, K, W> {
private static final long serialVersionUID = 1L;
- private final ReduceFunction<T> reduceFunction;
-
public ReduceWindowFunction(ReduceFunction<T> reduceFunction) {
- this.reduceFunction = reduceFunction;
- }
-
- @Override
- public void setRuntimeContext(RuntimeContext ctx) {
- super.setRuntimeContext(ctx);
- FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- FunctionUtils.openFunction(reduceFunction, parameters);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- FunctionUtils.closeFunction(reduceFunction);
+ super(reduceFunction);
}
@Override
@@ -59,7 +39,7 @@ public class ReduceWindowFunction<K, W extends Window, T> extends RichWindowFunc
if (result == null) {
result = v;
} else {
- result = reduceFunction.reduce(result, v);
+ result = wrappedFunction.reduce(result, v);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
new file mode 100644
index 0000000..c06a608
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
@@ -0,0 +1,372 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.streaming.api;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+
+ private static List<String> testResults;
+
+ @Test
+ public void testCoGroup() throws Exception {
+
+ testResults = Lists.newArrayList();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.getConfig().enableTimestamps();
+
+ DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple2.of("a", 0));
+ ctx.collect(Tuple2.of("a", 1));
+ ctx.collect(Tuple2.of("a", 2));
+
+ ctx.collect(Tuple2.of("b", 3));
+ ctx.collect(Tuple2.of("b", 4));
+ ctx.collect(Tuple2.of("b", 5));
+
+ ctx.collect(Tuple2.of("a", 6));
+ ctx.collect(Tuple2.of("a", 7));
+ ctx.collect(Tuple2.of("a", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple2TimestampExtractor());
+
+ DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple2.of("a", 0));
+ ctx.collect(Tuple2.of("a", 1));
+
+ ctx.collect(Tuple2.of("b", 3));
+
+ ctx.collect(Tuple2.of("c", 6));
+ ctx.collect(Tuple2.of("c", 7));
+ ctx.collect(Tuple2.of("c", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple2TimestampExtractor());
+
+
+ source1.coGroup(source2)
+ .where(new Tuple2KeyExtractor())
+ .equalTo(new Tuple2KeyExtractor())
+ .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
+ @Override
+ public void coGroup(Iterable<Tuple2<String, Integer>> first,
+ Iterable<Tuple2<String, Integer>> second,
+ Collector<String> out) throws Exception {
+ StringBuilder result = new StringBuilder();
+ result.append("F:");
+ for (Tuple2<String, Integer> t: first) {
+ result.append(t.toString());
+ }
+ result.append(" S:");
+ for (Tuple2<String, Integer> t: second) {
+ result.append(t.toString());
+ }
+ out.collect(result.toString());
+ }
+ })
+ .addSink(new SinkFunction<String>() {
+ @Override
+ public void invoke(String value) throws Exception {
+ testResults.add(value);
+ }
+ });
+
+ env.execute("CoGroup Test");
+
+ List<String> expectedResult = Lists.newArrayList(
+ "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
+ "F:(b,3)(b,4)(b,5) S:(b,3)",
+ "F:(a,6)(a,7)(a,8) S:",
+ "F: S:(c,6)(c,7)(c,8)");
+
+ Collections.sort(expectedResult);
+ Collections.sort(testResults);
+
+ Assert.assertEquals(expectedResult, testResults);
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+
+ testResults = Lists.newArrayList();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.getConfig().enableTimestamps();
+
+ DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple3.of("a", "x", 0));
+ ctx.collect(Tuple3.of("a", "y", 1));
+ ctx.collect(Tuple3.of("a", "z", 2));
+
+ ctx.collect(Tuple3.of("b", "u", 3));
+ ctx.collect(Tuple3.of("b", "w", 5));
+
+ ctx.collect(Tuple3.of("a", "i", 6));
+ ctx.collect(Tuple3.of("a", "j", 7));
+ ctx.collect(Tuple3.of("a", "k", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple3TimestampExtractor());
+
+ DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple3.of("a", "u", 0));
+ ctx.collect(Tuple3.of("a", "w", 1));
+
+ ctx.collect(Tuple3.of("b", "i", 3));
+ ctx.collect(Tuple3.of("b", "k", 5));
+
+ ctx.collect(Tuple3.of("a", "x", 6));
+ ctx.collect(Tuple3.of("a", "z", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple3TimestampExtractor());
+
+
+ source1.join(source2)
+ .where(new Tuple3KeyExtractor())
+ .equalTo(new Tuple3KeyExtractor())
+ .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
+ @Override
+ public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
+ return first + ":" + second;
+ }
+ })
+ .addSink(new SinkFunction<String>() {
+ @Override
+ public void invoke(String value) throws Exception {
+ testResults.add(value);
+ }
+ });
+
+ env.execute("Join Test");
+
+ List<String> expectedResult = Lists.newArrayList(
+ "(a,x,0):(a,u,0)",
+ "(a,x,0):(a,w,1)",
+ "(a,y,1):(a,u,0)",
+ "(a,y,1):(a,w,1)",
+ "(a,z,2):(a,u,0)",
+ "(a,z,2):(a,w,1)",
+ "(b,u,3):(b,i,3)",
+ "(b,u,3):(b,k,5)",
+ "(b,w,5):(b,i,3)",
+ "(b,w,5):(b,k,5)",
+ "(a,i,6):(a,x,6)",
+ "(a,i,6):(a,z,8)",
+ "(a,j,7):(a,x,6)",
+ "(a,j,7):(a,z,8)",
+ "(a,k,8):(a,x,6)",
+ "(a,k,8):(a,z,8)");
+
+ Collections.sort(expectedResult);
+ Collections.sort(testResults);
+
+ Assert.assertEquals(expectedResult, testResults);
+ }
+
+ @Test
+ public void testSelfJoin() throws Exception {
+
+ testResults = Lists.newArrayList();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.getConfig().enableTimestamps();
+
+ DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple3.of("a", "x", 0));
+ ctx.collect(Tuple3.of("a", "y", 1));
+ ctx.collect(Tuple3.of("a", "z", 2));
+
+ ctx.collect(Tuple3.of("b", "u", 3));
+ ctx.collect(Tuple3.of("b", "w", 5));
+
+ ctx.collect(Tuple3.of("a", "i", 6));
+ ctx.collect(Tuple3.of("a", "j", 7));
+ ctx.collect(Tuple3.of("a", "k", 8));
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).extractTimestamp(new Tuple3TimestampExtractor());
+
+ source1.join(source1)
+ .where(new Tuple3KeyExtractor())
+ .equalTo(new Tuple3KeyExtractor())
+ .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
+ @Override
+ public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
+ return first + ":" + second;
+ }
+ })
+ .addSink(new SinkFunction<String>() {
+ @Override
+ public void invoke(String value) throws Exception {
+ testResults.add(value);
+ }
+ });
+
+ env.execute("Self-Join Test");
+
+ List<String> expectedResult = Lists.newArrayList(
+ "(a,x,0):(a,x,0)",
+ "(a,x,0):(a,y,1)",
+ "(a,x,0):(a,z,2)",
+ "(a,y,1):(a,x,0)",
+ "(a,y,1):(a,y,1)",
+ "(a,y,1):(a,z,2)",
+ "(a,z,2):(a,x,0)",
+ "(a,z,2):(a,y,1)",
+ "(a,z,2):(a,z,2)",
+ "(b,u,3):(b,u,3)",
+ "(b,u,3):(b,w,5)",
+ "(b,w,5):(b,u,3)",
+ "(b,w,5):(b,w,5)",
+ "(a,i,6):(a,i,6)",
+ "(a,i,6):(a,j,7)",
+ "(a,i,6):(a,k,8)",
+ "(a,j,7):(a,i,6)",
+ "(a,j,7):(a,j,7)",
+ "(a,j,7):(a,k,8)",
+ "(a,k,8):(a,i,6)",
+ "(a,k,8):(a,j,7)",
+ "(a,k,8):(a,k,8)");
+
+ Collections.sort(expectedResult);
+ Collections.sort(testResults);
+
+ Assert.assertEquals(expectedResult, testResults);
+ }
+
+ private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) {
+ return element.f1;
+ }
+
+ @Override
+ public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
+ return element.f1 - 1;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return Long.MIN_VALUE;
+ }
+ }
+
+ private static class Tuple3TimestampExtractor implements TimestampExtractor<Tuple3<String, String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long extractTimestamp(Tuple3<String, String, Integer> element, long currentTimestamp) {
+ return element.f2;
+ }
+
+ @Override
+ public long emitWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
+ return element.f2 - 1;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return Long.MIN_VALUE;
+ }
+ }
+
+ private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple2<String, Integer> value) throws Exception {
+ return value.f0;
+ }
+ }
+
+ private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple3<String, String, Integer> value) throws Exception {
+ return value.f0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
deleted file mode 100644
index 7d2a131..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class WindowCrossJoinTest extends StreamingMultipleProgramsTestBase {
-
- private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
- private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-
- private static class MyTimestamp<T> implements Timestamp<T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(T value) {
- return 101L;
- }
- }
-
- /**
- * TODO: enable once new join operator is ready
- * @throws Exception
- */
- @Ignore
- @Test
- public void test() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setBufferTimeout(1);
-
- TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> joinResultSink =
- new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>();
- TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> crossResultSink =
- new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>();
-
- ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
- ArrayList<Tuple1<Integer>> in2 = new ArrayList<Tuple1<Integer>>();
-
- in1.add(new Tuple2<Integer, String>(10, "a"));
- in1.add(new Tuple2<Integer, String>(20, "b"));
- in1.add(new Tuple2<Integer, String>(20, "x"));
- in1.add(new Tuple2<Integer, String>(0, "y"));
-
- in2.add(new Tuple1<Integer>(0));
- in2.add(new Tuple1<Integer>(5));
- in2.add(new Tuple1<Integer>(20));
-
- joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(20, "b"), 20));
- joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(20, "x"), 20));
- joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(0, "y"), 0));
-
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(10, "a"), 0));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(10, "a"), 5));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(10, "a"), 20));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(20, "b"), 0));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(20, "b"), 5));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(20, "b"), 20));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(20, "x"), 0));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(20, "x"), 5));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(20, "x"), 20));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(0, "y"), 0));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(0, "y"), 5));
- crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
- new Tuple2<Integer, String>(0, "y"), 20));
-
- DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1);
- DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2);
-
- inStream1
- .join(inStream2)
- .onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
- new MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0)
- .map(new ResultMap())
- .addSink(joinResultSink);
-
- env.execute();
-
- assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinExpectedResults),
- new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinResultSink.getResult()));
- assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossExpectedResults),
- new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossResultSink.getResult()));
- }
-
- private static class ResultMap implements
- MapFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>,
- Tuple2<Tuple2<Integer, String>, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Tuple2<Integer, String>, Integer> map(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) throws Exception {
- return new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index fc9de1d..c116c01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -47,67 +47,6 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
private static List<String> expected;
/**
- * TODO: enable once new join operator is implemented
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Ignore
- @Test
- public void sameDataStreamTest() {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
- Timestamp<Integer> timeStamp = new IntegerTimestamp();
-
- KeySelector keySelector = new KeySelector<Integer, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- };
-
- DataStream<Integer> src = env.fromElements(1, 3, 5);
-
- @SuppressWarnings("unused")
- DataStreamSink<Tuple2<Integer, Integer>> dataStream =
- src.join(src).onWindow(50L, timeStamp, timeStamp).where(keySelector).equalTo(keySelector)
- .map(new MapFunction<Tuple2<Integer, Integer>, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Tuple2<Integer, Integer> value) throws Exception {
- return value.toString();
- }
- })
- .addSink(resultSink);
-
-
- try {
- env.execute();
-
- expected = new ArrayList<String>();
-
- expected.addAll(Arrays.asList("(1,1)", "(3,3)", "(5,5)"));
-
- List<String> result = resultSink.getResult();
-
- Collections.sort(expected);
- Collections.sort(result);
-
- assertEquals(expected, result);
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
- }
-
- /**
* We connect two different data streams in a chain to a CoMap.
*/
@Test