You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:29 UTC
[02/24] flink git commit: [FLINK-2550] [streaming] Rework JoinStreams
and CoGroupStreams to properly implement operator builder syntax
[FLINK-2550] [streaming] Rework JoinStreams and CoGroupStreams to properly implement operator builder syntax
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69dfc40d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69dfc40d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69dfc40d
Branch: refs/heads/master
Commit: 69dfc40d4b2c9f994d0f828d5f26ed27faaeade0
Parents: c24dca5
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 18:20:02 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200
----------------------------------------------------------------------
.../api/datastream/CoGroupedStreams.java | 176 ++++++++++---------
.../streaming/api/datastream/DataStream.java | 10 +-
.../streaming/api/datastream/JoinedStreams.java | 158 +++++++++--------
.../streaming/api/scala/CoGroupedStreams.scala | 2 +-
.../streaming/api/scala/JoinedStreams.scala | 4 +-
5 files changed, 182 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index e1f1a96..d1da783 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.datastream;
-import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -36,8 +35,11 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import static java.util.Objects.requireNonNull;
+
/**
*{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped.
* A streaming co-group operation is evaluated over elements in a window.
@@ -64,93 +66,87 @@ import java.util.List;
* .apply(new MyCoGroupFunction());
* } </pre>
*/
-public class CoGroupedStreams {
+public class CoGroupedStreams<T1, T2> {
- /**
- * A co-group operation that does not yet have its {@link KeySelector KeySelectors} defined.
- *
- * @param <T1> Type of the elements from the first input
- * @param <T2> Type of the elements from the second input
- */
- public static class Unspecified<T1, T2> {
- DataStream<T1> input1;
- DataStream<T2> input2;
+ /** The first input stream */
+ private final DataStream<T1> input1;
- protected Unspecified(DataStream<T1> input1,
- DataStream<T2> input2) {
- this.input1 = input1;
- this.input2 = input2;
- }
+ /** The second input stream */
+ private final DataStream<T2> input2;
- /**
- * Specifies a {@link KeySelector} for elements from the first input.
- */
- public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
- return new WithKey<>(input1, input2, input1.clean(keySelector), null);
- }
-
- /**
- * Specifies a {@link KeySelector} for elements from the second input.
- */
- public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
- return new WithKey<>(input1, input2, null, input1.clean(keySelector));
- }
+ /**
+ * Creates new CoGroped data streams, which are the first step towards building a streaming co-group.
+ *
+ * @param input1 The first data stream.
+ * @param input2 The second data stream.
+ */
+ public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
+ this.input1 = requireNonNull(input1);
+ this.input2 = requireNonNull(input2);
}
/**
- * A co-group operation that has {@link KeySelector KeySelectors} defined for either both or
- * one input.
- *
- * <p>
- * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)}
- * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a
- * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
- *
- * @param <T1> Type of the elements from the first input
- * @param <T2> Type of the elements from the second input
- * @param <KEY> Type of the key. This must be the same for both inputs
+ * Specifies a {@link KeySelector} for elements from the first input.
*/
- public static class WithKey<T1, T2, KEY> {
- DataStream<T1> input1;
- DataStream<T2> input2;
+ public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
+ TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+ return new Where<>(input1.clean(keySelector), keyType);
+ }
- KeySelector<T1, KEY> keySelector1;
- KeySelector<T2, KEY> keySelector2;
+ // ------------------------------------------------------------------------
+
+ /**
+ * CoGrouped streams that have the key for one side defined.
+ *
+ * @param <KEY> The type of the key.
+ */
+ public class Where<KEY> {
- protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
- this.input1 = input1;
- this.input2 = input2;
+ private final KeySelector<T1, KEY> keySelector1;
+ private final TypeInformation<KEY> keyType;
+ Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
this.keySelector1 = keySelector1;
- this.keySelector2 = keySelector2;
- }
-
- /**
- * Specifies a {@link KeySelector} for elements from the first input.
- */
- public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
- return new CoGroupedStreams.WithKey<>(input1, input2, input1.clean(keySelector), keySelector2);
+ this.keyType = keyType;
}
-
+
/**
* Specifies a {@link KeySelector} for elements from the second input.
*/
- public CoGroupedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
- return new CoGroupedStreams.WithKey<>(input1, input2, keySelector1, input1.clean(keySelector));
+ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
+ TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+ if (!otherKey.equals(this.keyType)) {
+ throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
+ "first key = " + this.keyType + " , second key = " + otherKey);
+ }
+
+ return new EqualTo(input2.clean(keySelector));
}
+ // --------------------------------------------------------------------
+
/**
- * Specifies the window on which the co-group operation works.
+ * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
*/
- public <W extends Window> CoGroupedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
- if (keySelector1 == null || keySelector2 == null) {
- throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo().");
+ public class EqualTo {
+
+ private final KeySelector<T2, KEY> keySelector2;
+
+ EqualTo(KeySelector<T2, KEY> keySelector2) {
+ this.keySelector2 = requireNonNull(keySelector2);
+ }
+ /**
+ * Specifies the window on which the co-group operation works.
+ */
+ public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
}
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null);
}
}
+ // ------------------------------------------------------------------------
+
/**
* A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
* well as a {@link WindowAssigner}.
@@ -166,6 +162,8 @@ public class CoGroupedStreams {
private final KeySelector<T1, KEY> keySelector1;
private final KeySelector<T2, KEY> keySelector2;
+
+ private final TypeInformation<KEY> keyType;
private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
@@ -177,6 +175,7 @@ public class CoGroupedStreams {
DataStream<T2> input2,
KeySelector<T1, KEY> keySelector1,
KeySelector<T2, KEY> keySelector2,
+ TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
@@ -185,7 +184,8 @@ public class CoGroupedStreams {
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
-
+ this.keyType = keyType;
+
this.windowAssigner = windowAssigner;
this.trigger = trigger;
this.evictor = evictor;
@@ -195,7 +195,8 @@ public class CoGroupedStreams {
* Sets the {@code Trigger} that should be used to trigger window emission.
*/
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor);
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, newTrigger, evictor);
}
/**
@@ -206,7 +207,8 @@ public class CoGroupedStreams {
* pre-aggregation of window results cannot be used.
*/
public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor);
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, trigger, newEvictor);
}
/**
@@ -236,16 +238,21 @@ public class CoGroupedStreams {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);
+ UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
+ UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
+
DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
.map(new Input1Tagger<T1, T2>())
- .returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
+ .returns(unionType);
DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
.map(new Input2Tagger<T1, T2>())
- .returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
+ .returns(unionType);
- WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = taggedInput1
- .union(taggedInput2)
- .keyBy(new UnionKeySelector<>(keySelector1, keySelector2))
+ DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
+
+ // we explicitly create the keyed stream to manually pass the key type information in
+ WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp =
+ new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
.window(windowAssigner);
if (trigger != null) {
@@ -259,13 +266,10 @@ public class CoGroupedStreams {
}
}
- /**
- * Creates a new co-group operation from the two given inputs.
- */
- public static <T1, T2> Unspecified<T1, T2> createCoGroup(DataStream<T1> input1, DataStream<T2> input2) {
- return new Unspecified<>(input1, input2);
- }
-
+ // ------------------------------------------------------------------------
+ // Data type and type information for Tagged Union
+ // ------------------------------------------------------------------------
+
/**
* Internal class for implementing tagged union co-group.
*/
@@ -425,7 +429,7 @@ public class CoGroupedStreams {
@Override
public int getLength() {
- return 0;
+ return -1;
}
@Override
@@ -494,6 +498,11 @@ public class CoGroupedStreams {
}
}
+ // ------------------------------------------------------------------------
+ // Utility functions that implement the CoGroup logic based on the tagged
+ // untion window reduce
+ // ------------------------------------------------------------------------
+
private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
private static final long serialVersionUID = 1L;
@@ -537,6 +546,7 @@ public class CoGroupedStreams {
private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
+
private static final long serialVersionUID = 1L;
public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
@@ -548,8 +558,10 @@ public class CoGroupedStreams {
W window,
Iterable<TaggedUnion<T1, T2>> values,
Collector<T> out) throws Exception {
- List<T1> oneValues = Lists.newArrayList();
- List<T2> twoValues = Lists.newArrayList();
+
+ List<T1> oneValues = new ArrayList<>();
+ List<T2> twoValues = new ArrayList<>();
+
for (TaggedUnion<T1, T2> val: values) {
if (val.isOne()) {
oneValues.add(val.getOne());
http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 7e686c7..c15ea9b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -239,7 +239,7 @@ public class DataStream<T> {
* The KeySelector to be used for extracting the key for partitioning
* @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
*/
- public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key){
+ public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
return new KeyedStream<T, K>(this, clean(key));
}
@@ -622,16 +622,16 @@ public class DataStream<T> {
* Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys
* and window can be specified.
*/
- public <T2> CoGroupedStreams.Unspecified<T, T2> coGroup(DataStream<T2> otherStream) {
- return CoGroupedStreams.createCoGroup(this, otherStream);
+ public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
+ return new CoGroupedStreams<>(this, otherStream);
}
/**
* Creates a join operation. See {@link JoinedStreams} for an example of how the keys
* and window can be specified.
*/
- public <T2> JoinedStreams.Unspecified<T, T2> join(DataStream<T2> otherStream) {
- return JoinedStreams.createJoin(this, otherStream);
+ public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
+ return new JoinedStreams<>(this, otherStream);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index ee848e3..cff9355 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -24,12 +24,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
+import static java.util.Objects.requireNonNull;
+
/**
*{@code JoinedStreams} represents two {@link DataStream DataStreams} that have been joined.
* A streaming join operation is evaluated over elements in a window.
@@ -56,92 +59,86 @@ import org.apache.flink.util.Collector;
* .apply(new MyJoinFunction());
* } </pre>
*/
-public class JoinedStreams extends CoGroupedStreams{
+public class JoinedStreams<T1, T2> {
+
+ /** The first input stream */
+ private final DataStream<T1> input1;
+
+ /** The second input stream */
+ private final DataStream<T2> input2;
/**
- * A join operation that does not yet have its {@link KeySelector KeySelectors} defined.
+ * Creates new JoinedStreams data streams, which are the first step towards building a streaming co-group.
*
- * @param <T1> Type of the elements from the first input
- * @param <T2> Type of the elements from the second input
+ * @param input1 The first data stream.
+ * @param input2 The second data stream.
*/
- public static class Unspecified<T1, T2> {
- DataStream<T1> input1;
- DataStream<T2> input2;
-
- protected Unspecified(DataStream<T1> input1,
- DataStream<T2> input2) {
- this.input1 = input1;
- this.input2 = input2;
- }
-
- /**
- * Specifies a {@link KeySelector} for elements from the first input.
- */
- public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
- return new WithKey<>(input1, input2, keySelector, null);
- }
+ public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
+ this.input1 = requireNonNull(input1);
+ this.input2 = requireNonNull(input2);
+ }
- /**
- * Specifies a {@link KeySelector} for elements from the second input.
- */
- public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
- return new WithKey<>(input1, input2, null, keySelector);
- }
+ /**
+ * Specifies a {@link KeySelector} for elements from the first input.
+ */
+ public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
+ TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+ return new Where<>(input1.clean(keySelector), keyType);
}
+ // ------------------------------------------------------------------------
+
/**
- * A join operation that has {@link KeySelector KeySelectors} defined for either both or
- * one input.
- *
- * <p>
- * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)}
- * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a
- * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
+ * CoGrouped streams that have the key for one side defined.
*
- * @param <T1> Type of the elements from the first input
- * @param <T2> Type of the elements from the second input
- * @param <KEY> Type of the key. This must be the same for both inputs
+ * @param <KEY> The type of the key.
*/
- public static class WithKey<T1, T2, KEY> {
- DataStream<T1> input1;
- DataStream<T2> input2;
-
- KeySelector<T1, KEY> keySelector1;
- KeySelector<T2, KEY> keySelector2;
+ public class Where<KEY> {
- protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
- this.input1 = input1;
- this.input2 = input2;
+ private final KeySelector<T1, KEY> keySelector1;
+ private final TypeInformation<KEY> keyType;
+ Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
this.keySelector1 = keySelector1;
- this.keySelector2 = keySelector2;
- }
-
- /**
- * Specifies a {@link KeySelector} for elements from the first input.
- */
- public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) {
- return new JoinedStreams.WithKey<>(input1, input2, keySelector, keySelector2);
+ this.keyType = keyType;
}
/**
* Specifies a {@link KeySelector} for elements from the second input.
*/
- public JoinedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) {
- return new JoinedStreams.WithKey<>(input1, input2, keySelector1, keySelector);
+ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
+ TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+ if (!otherKey.equals(this.keyType)) {
+ throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
+ "first key = " + this.keyType + " , second key = " + otherKey);
+ }
+
+ return new EqualTo(input2.clean(keySelector));
}
+ // --------------------------------------------------------------------
+
/**
- * Specifies the window on which the join operation works.
+ * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
*/
- public <W extends Window> JoinedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
- if (keySelector1 == null || keySelector2 == null) {
- throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo().");
+ public class EqualTo {
+ private final KeySelector<T2, KEY> keySelector2;
+
+ EqualTo(KeySelector<T2, KEY> keySelector2) {
+ this.keySelector2 = requireNonNull(keySelector2);
+ }
+
+ /**
+ * Specifies the window on which the co-group operation works.
+ */
+ public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
}
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null);
}
}
+
+ // ------------------------------------------------------------------------
/**
* A join operation that has {@link KeySelector KeySelectors} defined for both inputs as
@@ -153,11 +150,13 @@ public class JoinedStreams extends CoGroupedStreams{
* @param <W> Type of {@link Window} on which the join operation works.
*/
public static class WithWindow<T1, T2, KEY, W extends Window> {
+
private final DataStream<T1> input1;
private final DataStream<T2> input2;
private final KeySelector<T1, KEY> keySelector1;
private final KeySelector<T2, KEY> keySelector2;
+ private final TypeInformation<KEY> keyType;
private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
@@ -169,16 +168,20 @@ public class JoinedStreams extends CoGroupedStreams{
DataStream<T2> input2,
KeySelector<T1, KEY> keySelector1,
KeySelector<T2, KEY> keySelector2,
+ TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
- this.input1 = input1;
- this.input2 = input2;
-
- this.keySelector1 = keySelector1;
- this.keySelector2 = keySelector2;
-
- this.windowAssigner = windowAssigner;
+
+ this.input1 = requireNonNull(input1);
+ this.input2 = requireNonNull(input2);
+
+ this.keySelector1 = requireNonNull(keySelector1);
+ this.keySelector2 = requireNonNull(keySelector2);
+ this.keyType = requireNonNull(keyType);
+
+ this.windowAssigner = requireNonNull(windowAssigner);
+
this.trigger = trigger;
this.evictor = evictor;
}
@@ -187,7 +190,8 @@ public class JoinedStreams extends CoGroupedStreams{
* Sets the {@code Trigger} that should be used to trigger window emission.
*/
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor);
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, newTrigger, evictor);
}
/**
@@ -198,7 +202,8 @@ public class JoinedStreams extends CoGroupedStreams{
* pre-aggregation of window results cannot be used.
*/
public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor);
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, trigger, newEvictor);
}
/**
@@ -213,7 +218,7 @@ public class JoinedStreams extends CoGroupedStreams{
true,
input1.getType(),
input2.getType(),
- "CoGroup",
+ "Join",
false);
return apply(function, resultType);
@@ -249,7 +254,7 @@ public class JoinedStreams extends CoGroupedStreams{
true,
input1.getType(),
input2.getType(),
- "CoGroup",
+ "Join",
false);
return apply(function, resultType);
@@ -273,13 +278,10 @@ public class JoinedStreams extends CoGroupedStreams{
}
}
-
- /**
- * Creates a new join operation from the two given inputs.
- */
- public static <T1, T2> Unspecified<T1, T2> createJoin(DataStream<T1> input1, DataStream<T2> input2) {
- return new Unspecified<>(input1, input2);
- }
+
+ // ------------------------------------------------------------------------
+ // Implementation of the functions
+ // ------------------------------------------------------------------------
/**
* CoGroup function that does a nested-loop join to get the join result.
http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 0164b92..e676f81 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -270,7 +270,7 @@ object CoGroupedStreams {
*/
def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = {
- val coGroup = JavaCoGroupedStreams.createCoGroup(input1.getJavaStream, input2.getJavaStream)
+ val coGroup = new JavaCoGroupedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
coGroup
.where(keySelector1)
http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index 2fda32d..c259724 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -263,7 +263,7 @@ object JoinedStreams {
*/
def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = {
- val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream)
+ val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
join
.where(keySelector1)
@@ -280,7 +280,7 @@ object JoinedStreams {
*/
def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] = {
- val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream)
+ val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
join
.where(keySelector1)