You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/07 15:26:29 UTC
[1/6] flink git commit: [FLINK-4957] Provide API for
TimelyCoFlatMapFunction
Repository: flink
Updated Branches:
refs/heads/master 718f6e4e3 -> 891950eab
[FLINK-4957] Provide API for TimelyCoFlatMapFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891950ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891950ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891950ea
Branch: refs/heads/master
Commit: 891950eabaaed1fdfc1c0c88806f1125b085c4b6
Parents: 0b873ac
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 14:36:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100
----------------------------------------------------------------------
.../api/datastream/ConnectedStreams.java | 63 ++++++++++++++++++++
.../streaming/api/scala/ConnectedStreams.scala | 30 +++++++++-
2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 50ef95b..dc763cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -26,9 +27,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import static java.util.Objects.requireNonNull;
@@ -230,6 +233,66 @@ public class ConnectedStreams<IN1, IN2> {
return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
}
+ /**
+ * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+ * thereby creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the streams and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+ * can be used to gain access to features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+ *
+ * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+ * in the stream.
+ *
+ * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ public <R> SingleOutputStreamOperator<R> flatMap(
+ TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
+
+ TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
+ TimelyCoFlatMapFunction.class, false, true, getType1(), getType2(),
+ Utils.getCallLocationName(), true);
+
+ return flatMap(coFlatMapper, outTypeInfo);
+ }
+
+ /**
+ * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+ * thereby creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the streams and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+ * can be used to gain access to features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+ *
+ * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+ * in the stream.
+ *
+ * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ @Internal
+ public <R> SingleOutputStreamOperator<R> flatMap(
+ TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper,
+ TypeInformation<R> outputType) {
+
+ CoStreamTimelyFlatMap<Object, IN1, IN2, R> operator = new CoStreamTimelyFlatMap<>(
+ inputStream1.clean(coFlatMapper));
+
+ return transform("Co-Flat Map", outputType, operator);
+ }
+
+
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String functionName,
TypeInformation<R> outTypeInfo,
http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 141625e..50526b5 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, TimelyCoFlatMapFunction}
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
import org.apache.flink.util.Collector
@@ -101,6 +101,34 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
}
/**
+ * Applies the given [[TimelyCoFlatMapFunction]] on the connected input streams,
+ * thereby creating a transformed output stream.
+ *
+ * The function will be called for every element in the streams and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * A [[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]]
+ * can be used to gain access to features provided by the
+ * [[org.apache.flink.api.common.functions.RichFunction]] interface.
+ *
+ * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for each element
+ * in the stream.
+ *
+ * @return The transformed { @link DataStream}.
+ */
+ def flatMap[R: TypeInformation](
+ coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = {
+
+ if (coFlatMapper == null) throw new NullPointerException("FlatMap function must not be null.")
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+
+ asScalaStream(javaStream.flatMap(coFlatMapper, outType))
+ }
+
+
+ /**
* Applies a CoFlatMap transformation on these connected streams.
*
* The transformation calls [[CoFlatMapFunction#flatMap1]] for each element
[3/6] flink git commit: [hotfix] Add Check for Keyed Operator in
getInternalTimerService()
Posted by al...@apache.org.
[hotfix] Add Check for Keyed Operator in getInternalTimerService()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/132d8f14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/132d8f14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/132d8f14
Branch: refs/heads/master
Commit: 132d8f146c111ca7acf890fc2e2950e4931b2c7b
Parents: 718f6e4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Nov 7 11:52:38 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100
----------------------------------------------------------------------
.../flink/streaming/api/operators/AbstractStreamOperator.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/132d8f14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a659866..7b555b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -784,6 +784,9 @@ public abstract class AbstractStreamOperator<OUT>
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable) {
+ if (getKeyedStateBackend() == null) {
+ throw new UnsupportedOperationException("Timers can only be used on keyed operators.");
+ }
@SuppressWarnings("unchecked")
HeapInternalTimerService<K, N> timerService = (HeapInternalTimerService<K, N>) timerServices.get(name);
[6/6] flink git commit: [FLINK-4952] [scala] Add
KeyedStream.flatMap(TimelyFlatMapFunction)
Posted by al...@apache.org.
[FLINK-4952] [scala] Add KeyedStream.flatMap(TimelyFlatMapFunction)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0ef3703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0ef3703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0ef3703
Branch: refs/heads/master
Commit: f0ef370399638689c2e1adc54a3acf0afab67a17
Parents: b9173b3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 13:47:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/KeyedStream.java | 31 ++++++++++++++++--
.../flink/streaming/api/scala/KeyedStream.scala | 33 +++++++++++++++++++-
2 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 922ad20..c938f5b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -202,11 +202,38 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
Utils.getCallLocationName(),
true);
+ return flatMap(flatMapper, outType);
+ }
+
+ /**
+ * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+ * creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the stream and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+ * can be used to gain access to features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+ *
+ * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+ * in the stream.
+ * @param outputType {@link TypeInformation} for the result type of the function.
+ *
+ * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ @Internal
+ public <R> SingleOutputStreamOperator<R> flatMap(
+ TimelyFlatMapFunction<T, R> flatMapper,
+ TypeInformation<R> outputType) {
+
StreamTimelyFlatMap<KEY, T, R> operator =
new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper));
- return transform("Flat Map", outType, operator);
-
+ return transform("Flat Map", outputType, operator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 68eebea..1971359 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, QueryableStateStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
@@ -46,6 +47,36 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
*/
@Internal
def getKeyType = javaStream.getKeyType()
+
+
+ // ------------------------------------------------------------------------
+ // basic transformations
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby
+ * creating a transformed output stream.
+ *
+ * The function will be called for every element in the stream and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]]
+ * can be used to gain access to features provided by the
+ * [[org.apache.flink.api.common.functions.RichFunction]]
+ *
+ * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each element
+ * in the stream.
+ */
+ def flatMap[R: TypeInformation](
+ flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = {
+
+ if (flatMapper == null) {
+ throw new NullPointerException("TimelyFlatMapFunction must not be null.")
+ }
+
+ asScalaStream(javaStream.flatMap(flatMapper, implicitly[TypeInformation[R]]))
+ }
// ------------------------------------------------------------------------
// Windowing
[4/6] flink git commit: [FLINK-4951] Fix Javadoc of
KeyedStream.flatMap(TimelyFlatMapFunction)
Posted by al...@apache.org.
[FLINK-4951] Fix Javadoc of KeyedStream.flatMap(TimelyFlatMapFunction)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9173b35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9173b35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9173b35
Branch: refs/heads/master
Commit: b9173b355139f007fb840aa6a196f50b366ad8d8
Parents: 132d8f1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 12:21:22 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/KeyedStream.java | 25 ++++++++++----------
1 file changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b9173b35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 1bce6a2..922ad20 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -20,10 +20,8 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -175,19 +173,22 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
}
/**
- * Applies a FlatMap transformation on a {@link DataStream}. The
- * transformation calls a {@link FlatMapFunction} for each element of the
- * DataStream. Each FlatMapFunction call can return any number of elements
- * including none. The user can also extend {@link RichFlatMapFunction} to
- * gain access to other features provided by the
+ * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+ * creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the stream and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+ * can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
- * @param flatMapper
- * The FlatMapFunction that is called for each element of the
- * DataStream
+ * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+ * in the stream.
+ *
+ * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
*
- * @param <R>
- * output type
* @return The transformed {@link DataStream}.
*/
public <R> SingleOutputStreamOperator<R> flatMap(TimelyFlatMapFunction<T, R> flatMapper) {
[5/6] flink git commit: [FLINK-4957] Remove Key Serializer Parameter
getInternalTimerService()
Posted by al...@apache.org.
[FLINK-4957] Remove Key Serializer Parameter getInternalTimerService()
It's not needed because we can get the key serializer from the keyed
state backend.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b873ac3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b873ac3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b873ac3
Branch: refs/heads/master
Commit: 0b873ac343343fd1d7716d075b54e66324374f47
Parents: 06fb9f1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 14:34:47 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/KeyedStream.java | 2 +-
.../api/operators/AbstractStreamOperator.java | 13 ++++++-------
.../api/operators/StreamTimelyFlatMap.java | 9 ++-------
.../api/operators/co/CoStreamTimelyFlatMap.java | 11 ++---------
.../operators/windowing/WindowOperator.java | 2 +-
.../api/operators/AbstractStreamOperatorTest.java | 1 -
.../api/operators/TimelyFlatMapTest.java | 18 +++++++++---------
.../api/operators/co/TimelyCoFlatMapTest.java | 16 ++++++++--------
8 files changed, 29 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index c938f5b..4063b60 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -231,7 +231,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
TypeInformation<R> outputType) {
StreamTimelyFlatMap<KEY, T, R> operator =
- new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper));
+ new StreamTimelyFlatMap<>(clean(flatMapper));
return transform("Flat Map", outputType, operator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7b555b7..839abf8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -772,24 +772,21 @@ public abstract class AbstractStreamOperator<OUT>
*
* @param name The name of the requested timer service. If no service exists under the given
* name a new one will be created and returned.
- * @param keySerializer {@code TypeSerializer} for the keys of the timers.
* @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
* @param triggerable The {@link Triggerable} that should be invoked when timers fire
*
- * @param <K> The type of the timer keys.
* @param <N> The type of the timer namespace.
*/
- public <K, N> InternalTimerService<N> getInternalTimerService(
+ public <N> InternalTimerService<N> getInternalTimerService(
String name,
- TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
- Triggerable<K, N> triggerable) {
+ Triggerable<?, N> triggerable) {
if (getKeyedStateBackend() == null) {
throw new UnsupportedOperationException("Timers can only be used on keyed operators.");
}
@SuppressWarnings("unchecked")
- HeapInternalTimerService<K, N> timerService = (HeapInternalTimerService<K, N>) timerServices.get(name);
+ HeapInternalTimerService<Object, N> timerService = (HeapInternalTimerService<Object, N>) timerServices.get(name);
if (timerService == null) {
timerService = new HeapInternalTimerService<>(
@@ -799,7 +796,9 @@ public abstract class AbstractStreamOperator<OUT>
getRuntimeContext().getProcessingTimeService());
timerServices.put(name, timerService);
}
- timerService.startTimerService(keySerializer, namespaceSerializer, triggerable);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Triggerable rawTriggerable = (Triggerable) triggerable;
+ timerService.startTimerService(getKeyedStateBackend().getKeySerializer(), namespaceSerializer, rawTriggerable);
return timerService;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
index 962f264..d507ba6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
@@ -34,17 +33,13 @@ public class StreamTimelyFlatMap<K, IN, OUT>
private static final long serialVersionUID = 1L;
- private final TypeSerializer<K> keySerializer;
-
private transient TimestampedCollector<OUT> collector;
private transient TimerService timerService;
- public StreamTimelyFlatMap(TypeSerializer<K> keySerializer, TimelyFlatMapFunction<IN, OUT> flatMapper) {
+ public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
- this.keySerializer = keySerializer;
-
chainingStrategy = ChainingStrategy.ALWAYS;
}
@@ -54,7 +49,7 @@ public class StreamTimelyFlatMap<K, IN, OUT>
collector = new TimestampedCollector<>(output);
InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
this.timerService = new SimpleTimerService(internalTimerService);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
index df2320f..212aafd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators.co;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
@@ -40,18 +39,12 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
private static final long serialVersionUID = 1L;
- private final TypeSerializer<K> keySerializer;
-
private transient TimestampedCollector<OUT> collector;
private transient TimerService timerService;
- public CoStreamTimelyFlatMap(
- TypeSerializer<K> keySerializer,
- TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+ public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
-
- this.keySerializer = keySerializer;
}
@Override
@@ -60,7 +53,7 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
collector = new TimestampedCollector<>(output);
InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
this.timerService = new SimpleTimerService(internalTimerService);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index c465767..229d97d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -187,7 +187,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
timestampedCollector = new TimestampedCollector<>(output);
internalTimerService =
- getInternalTimerService("window-timers", keySerializer, windowSerializer, this);
+ getInternalTimerService("window-timers", windowSerializer, this);
context = new Context(null, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index fd05353..2fb0089 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -448,7 +448,6 @@ public class AbstractStreamOperatorTest {
this.timerService = getInternalTimerService(
"test-timers",
- IntSerializer.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
this);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
index f3b09eb..46b52ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testCurrentEventTime() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+ new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -79,7 +79,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testCurrentProcessingTime() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+ new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -107,7 +107,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testEventTimeTimers() throws Exception {
StreamTimelyFlatMap<Integer, Integer, Integer> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+ new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -137,7 +137,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testProcessingTimeTimers() throws Exception {
StreamTimelyFlatMap<Integer, Integer, Integer> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+ new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -166,7 +166,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testEventTimeTimerWithState() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+ new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -206,7 +206,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testProcessingTimeTimerWithState() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+ new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -239,7 +239,7 @@ public class TimelyFlatMapTest extends TestLogger {
public void testSnapshotAndRestore() throws Exception {
StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+ new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -254,7 +254,7 @@ public class TimelyFlatMapTest extends TestLogger {
testHarness.close();
- operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+ operator = new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -352,7 +352,7 @@ public class TimelyFlatMapTest extends TestLogger {
private static final long serialVersionUID = 1L;
private final ValueStateDescriptor<Integer> state =
- new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
+ new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
private final TimeDomain timeDomain;
http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
index 25808f4..cb5d6c2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testCurrentEventTime() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -85,7 +85,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testCurrentProcessingTime() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -117,7 +117,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testEventTimeTimers() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new EventTimeTriggeringFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -156,7 +156,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testProcessingTimeTimers() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -193,7 +193,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testEventTimeTimerWithState() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new EventTimeTriggeringStatefulFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -242,7 +242,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testProcessingTimeTimerWithState() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringStatefulFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -279,7 +279,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
public void testSnapshotAndRestore() throws Exception {
CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+ new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -299,7 +299,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
testHarness.close();
- operator = new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+ operator = new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
operator,
[2/6] flink git commit: [FLINK-4955] Add Translations Tests for
KeyedStream.flatMap(TimelyFlatMapFunction)
Posted by al...@apache.org.
[FLINK-4955] Add Translations Tests for KeyedStream.flatMap(TimelyFlatMapFunction)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06fb9f1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06fb9f1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06fb9f1b
Branch: refs/heads/master
Commit: 06fb9f1b4f97ade67a23cd3adc8212e7d848de48
Parents: f0ef370
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 13:58:29 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100
----------------------------------------------------------------------
.../flink/streaming/api/DataStreamTest.java | 50 ++++++++++++++++++++
.../streaming/api/scala/DataStreamTest.scala | 28 ++++++++++-
2 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 17bea68..5e43120 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -544,6 +546,45 @@ public class DataStreamTest {
assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
}
+ /**
+ * Verify that a timely flat map call is correctly translated to an operator.
+ */
+ @Test
+ public void testTimelyFlatMapTranslation() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Long> src = env.generateSequence(0, 0);
+
+ TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = new TimelyFlatMapFunction<Long, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(
+ Long value,
+ TimerService timerService,
+ Collector<Integer> out) throws Exception {
+
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ TimeDomain timeDomain,
+ TimerService timerService,
+ Collector<Integer> out) throws Exception {
+
+ }
+ };
+
+ DataStream<Integer> flatMapped = src
+ .keyBy(new IdentityKeySelector<Long>())
+ .flatMap(timelyFlatMapFunction);
+
+ flatMapped.addSink(new DiscardingSink<Integer>());
+
+ assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped));
+ assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap);
+ }
+
@Test
public void operatorTest() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -809,6 +850,15 @@ public class DataStreamTest {
}
}
+ private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T getKey(T value) throws Exception {
+ return value;
+ }
+ }
+
public static class CustomPOJO {
private String s;
private int i;
http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index b73eae8..de8b388 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -22,10 +22,12 @@ import java.lang
import org.apache.flink.api.common.functions._
import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.{TimeDomain, TimerService}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator}
+import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap}
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
@@ -315,6 +317,30 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
// TODO check for custom case class
}
+ /**
+ * Verify that a timely flat map call is correctly translated to an operator.
+ */
+ @Test
+ def testTimelyFlatMapTranslation(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val src = env.generateSequence(0, 0)
+
+ val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] {
+ override def flatMap(value: Long, timerService: TimerService, out: Collector[Int]): Unit = ???
+ override def onTimer(
+ timestamp: Long,
+ timeDomain: TimeDomain,
+ timerService: TimerService,
+ out: Collector[Int]): Unit = ???
+ }
+
+ val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction)
+
+ assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped))
+ assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, _, _]])
+ }
+
@Test def operatorTest() {
val env = StreamExecutionEnvironment.getExecutionEnvironment