You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:03 UTC
[07/12] flink git commit: Harmonize generic parameter names in Stream
API classes
Harmonize generic parameter names in Stream API classes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86c45bfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86c45bfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86c45bfa
Branch: refs/heads/master
Commit: 86c45bfa2c760ca99741fa866f777730514c7986
Parents: 05d2138
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 24 16:40:46 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 28 17:04:16 2015 +0200
----------------------------------------------------------------------
.../api/datastream/GroupedDataStream.java | 68 +++---
.../api/datastream/KeyedDataStream.java | 18 +-
.../api/datastream/KeyedWindowDataStream.java | 22 +-
.../api/datastream/WindowedDataStream.java | 239 +++++++++----------
4 files changed, 173 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 50bf341..fde5a6d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -35,11 +35,11 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
* partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
* {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
* get additional functionality by the grouping.
- *
- * @param <OUT>
- * The output type of the {@link GroupedDataStream}.
+ *
+ * @param <T> The type of the elements in the Grouped Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
*/
-public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
+public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> {
/**
* Creates a new {@link GroupedDataStream}, group inclusion is determined using
@@ -48,7 +48,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* @param dataStream Base stream of data
* @param keySelector Function for determining group inclusion
*/
- public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySelector) {
+ public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
super(dataStream, keySelector);
}
@@ -64,8 +64,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* element of the input values with the same key.
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
- return transform("Grouped Reduce", getType(), new StreamGroupedReduce<OUT>(
+ public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
+ return transform("Grouped Reduce", getType(), new StreamGroupedReduce<T>(
clean(reducer), keySelector));
}
@@ -82,12 +82,12 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* The initialValue passed to the folders for each key.
* @return The transformed DataStream.
*/
- public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
+ public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
Utils.getCallLocationName(), true);
- return transform("Grouped Fold", outType, new StreamGroupedFold<OUT, R>(clean(folder),
+ return transform("Grouped Fold", outType, new StreamGroupedFold<T, R>(clean(folder),
keySelector, initialValue));
}
@@ -100,8 +100,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* The position in the data point to sum
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
- return aggregate(new SumAggregator<OUT>(positionToSum, getType(), getExecutionConfig()));
+ public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
+ return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig()));
}
/**
@@ -117,8 +117,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* applied.
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> sum(String field) {
- return aggregate(new SumAggregator<OUT>(field, getType(), getExecutionConfig()));
+ public SingleOutputStreamOperator<T, ?> sum(String field) {
+ return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig()));
}
/**
@@ -130,8 +130,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* The position in the data point to minimize
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
- return aggregate(new ComparableAggregator<OUT>(positionToMin, getType(), AggregationType.MIN,
+ public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
+ return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN,
getExecutionConfig()));
}
@@ -148,8 +148,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* applied.
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> min(String field) {
- return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MIN,
+ public SingleOutputStreamOperator<T, ?> min(String field) {
+ return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN,
false, getExecutionConfig()));
}
@@ -162,8 +162,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* The position in the data point to maximize
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
- return aggregate(new ComparableAggregator<OUT>(positionToMax, getType(), AggregationType.MAX,
+ public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
+ return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX,
getExecutionConfig()));
}
@@ -180,8 +180,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* applied.
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> max(String field) {
- return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAX,
+ public SingleOutputStreamOperator<T, ?> max(String field) {
+ return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX,
false, getExecutionConfig()));
}
@@ -202,7 +202,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* @return The transformed DataStream.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
+ public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
return aggregate(new ComparableAggregator(field, getType(), AggregationType.MINBY,
first, getExecutionConfig()));
}
@@ -223,8 +223,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* be returned
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAXBY,
+ public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY,
first, getExecutionConfig()));
}
@@ -238,7 +238,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* The position in the data point to minimize
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+ public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
@@ -252,7 +252,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* The position in the data point to minimize
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) {
+ public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
@@ -270,8 +270,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* minimal value, otherwise returns the last
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
- return aggregate(new ComparableAggregator<OUT>(positionToMinBy, getType(), AggregationType.MINBY, first,
+ public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
+ return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first,
getExecutionConfig()));
}
@@ -285,7 +285,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* The position in the data point to maximize
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+ public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
@@ -299,7 +299,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* The position in the data point to maximize
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) {
+ public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
@@ -317,13 +317,13 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
- public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
- return aggregate(new ComparableAggregator<OUT>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
+ public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
+ return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
getExecutionConfig()));
}
- protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
- StreamGroupedReduce<OUT> operator = new StreamGroupedReduce<OUT>(clean(aggregate), keySelector);
+ protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
+ StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector);
return transform("Grouped Aggregation", getType(), operator);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index a32cf53..611953e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -34,11 +34,11 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
*
*
* @param <T> The type of the elements in the Keyed Stream.
- * @param <K> The type of the key in the Keyed Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
*/
-public class KeyedDataStream<T, K> extends DataStream<T> {
+public class KeyedDataStream<T, KEY> extends DataStream<T> {
- protected final KeySelector<T, K> keySelector;
+ protected final KeySelector<T, KEY> keySelector;
/**
* Creates a new {@link KeyedDataStream} using the given {@link KeySelector}
@@ -49,13 +49,13 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
* @param keySelector
* Function for determining state partitions
*/
- public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) {
+ public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
this.keySelector = keySelector;
}
- public KeySelector<T, K> getKeySelector() {
+ public KeySelector<T, KEY> getKeySelector() {
return this.keySelector;
}
@@ -98,8 +98,8 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
* @param policy The policy that defines the window.
* @return The windows data stream.
*/
- public KeyedWindowDataStream<T, K> window(WindowPolicy policy) {
- return new KeyedWindowDataStream<T, K>(this, policy);
+ public KeyedWindowDataStream<T, KEY> window(WindowPolicy policy) {
+ return new KeyedWindowDataStream<T, KEY>(this, policy);
}
/**
@@ -112,7 +112,7 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
* @param slide The additional policy defining the slide of the window.
* @return The windows data stream.
*/
- public KeyedWindowDataStream<T, K> window(WindowPolicy window, WindowPolicy slide) {
- return new KeyedWindowDataStream<T, K>(this, window, slide);
+ public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) {
+ return new KeyedWindowDataStream<T, KEY>(this, window, slide);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index 37151d7..711a959 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -43,13 +43,13 @@ import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
* KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation
* over the window into one single operation.
*
- * @param <Type> The type of elements in the stream.
- * @param <Key> The type of the key by which elements are grouped.
+ * @param <T> The type of elements in the stream.
+ * @param <K> The type of the key by which elements are grouped.
*/
-public class KeyedWindowDataStream<Type, Key> {
+public class KeyedWindowDataStream<T, K> {
/** The keyed data stream that is windowed by this stream */
- private final KeyedDataStream<Type, Key> input;
+ private final KeyedDataStream<T, K> input;
/** The core window policy */
private final WindowPolicy windowPolicy;
@@ -58,11 +58,11 @@ public class KeyedWindowDataStream<Type, Key> {
private final WindowPolicy slidePolicy;
- public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, WindowPolicy windowPolicy) {
+ public KeyedWindowDataStream(KeyedDataStream<T, K> input, WindowPolicy windowPolicy) {
this(input, windowPolicy, null);
}
- public KeyedWindowDataStream(KeyedDataStream<Type, Key> input,
+ public KeyedWindowDataStream(KeyedDataStream<T, K> input,
WindowPolicy windowPolicy, WindowPolicy slidePolicy)
{
TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic();
@@ -91,7 +91,7 @@ public class KeyedWindowDataStream<Type, Key> {
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
*/
- public DataStream<Type> reduceWindow(ReduceFunction<Type> function) {
+ public DataStream<T> reduceWindow(ReduceFunction<T> function) {
String callLocation = Utils.getCallLocationName();
return createWindowOperator(function, input.getType(), "Reduce at " + callLocation);
}
@@ -107,10 +107,10 @@ public class KeyedWindowDataStream<Type, Key> {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
- public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, Result, Key> function) {
+ public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, Result, K> function) {
String callLocation = Utils.getCallLocationName();
- TypeInformation<Type> inType = input.getType();
+ TypeInformation<T> inType = input.getType();
TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, KeyedWindowFunction.class, true, true, inType, null, false);
@@ -125,9 +125,9 @@ public class KeyedWindowDataStream<Type, Key> {
Function function, TypeInformation<Result> resultType, String functionName) {
String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
- KeySelector<Type, Key> keySel = input.getKeySelector();
+ KeySelector<T, K> keySel = input.getKeySelector();
- OneInputStreamOperator<Type, Result> operator =
+ OneInputStreamOperator<T, Result> operator =
PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
return input.transform(opName, resultType, operator);
http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 1226adf..ef6f53b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -80,46 +80,45 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
* can be applied to the windows. The results of these transformations are also
* WindowedDataStreams of the same discretisation unit.
*
- * @param <OUT>
- * The output type of the {@link WindowedDataStream}
+ * @param <T> The output type of the {@link WindowedDataStream}
*/
-public class WindowedDataStream<OUT> {
+public class WindowedDataStream<T> {
- protected DataStream<OUT> dataStream;
+ protected DataStream<T> dataStream;
protected boolean isLocal = false;
- protected KeySelector<OUT, ?> discretizerKey;
- protected KeySelector<OUT, ?> groupByKey;
+ protected KeySelector<T, ?> discretizerKey;
+ protected KeySelector<T, ?> groupByKey;
- protected WindowingHelper<OUT> triggerHelper;
- protected WindowingHelper<OUT> evictionHelper;
+ protected WindowingHelper<T> triggerHelper;
+ protected WindowingHelper<T> evictionHelper;
- protected TriggerPolicy<OUT> userTrigger;
- protected EvictionPolicy<OUT> userEvicter;
+ protected TriggerPolicy<T> userTrigger;
+ protected EvictionPolicy<T> userEvicter;
- protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT> policyHelper) {
+ protected WindowedDataStream(DataStream<T> dataStream, WindowingHelper<T> policyHelper) {
this.dataStream = dataStream;
this.triggerHelper = policyHelper;
if (dataStream instanceof GroupedDataStream) {
- this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
+ this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector;
}
}
- protected WindowedDataStream(DataStream<OUT> dataStream, TriggerPolicy<OUT> trigger,
- EvictionPolicy<OUT> evicter) {
+ protected WindowedDataStream(DataStream<T> dataStream, TriggerPolicy<T> trigger,
+ EvictionPolicy<T> evicter) {
this.dataStream = dataStream;
this.userTrigger = trigger;
this.userEvicter = evicter;
if (dataStream instanceof GroupedDataStream) {
- this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
+ this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector;
}
}
- protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
+ protected WindowedDataStream(WindowedDataStream<T> windowedDataStream) {
this.dataStream = windowedDataStream.dataStream;
this.discretizerKey = windowedDataStream.discretizerKey;
this.groupByKey = windowedDataStream.groupByKey;
@@ -148,9 +147,9 @@ public class WindowedDataStream<OUT> {
* @return The windowed data stream with triggering set
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
- public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
+ public WindowedDataStream<T> every(WindowingHelper policyHelper) {
policyHelper.setExecutionConfig(getExecutionConfig());
- WindowedDataStream<OUT> ret = this.copy();
+ WindowedDataStream<T> ret = this.copy();
if (ret.evictionHelper == null) {
ret.evictionHelper = ret.triggerHelper;
ret.triggerHelper = policyHelper;
@@ -171,11 +170,11 @@ public class WindowedDataStream<OUT> {
* The position of the fields to group by.
* @return The grouped {@link WindowedDataStream}
*/
- public WindowedDataStream<OUT> groupBy(int... fields) {
+ public WindowedDataStream<T> groupBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
- return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+ return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
} else {
- return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+ return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
}
@@ -195,8 +194,8 @@ public class WindowedDataStream<OUT> {
* The fields to group by
* @return The grouped {@link WindowedDataStream}
*/
- public WindowedDataStream<OUT> groupBy(String... fields) {
- return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+ public WindowedDataStream<T> groupBy(String... fields) {
+ return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
/**
@@ -211,13 +210,13 @@ public class WindowedDataStream<OUT> {
* The keySelector used to extract the key for grouping.
* @return The grouped {@link WindowedDataStream}
*/
- public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
- WindowedDataStream<OUT> ret = this.copy();
+ public WindowedDataStream<T> groupBy(KeySelector<T, ?> keySelector) {
+ WindowedDataStream<T> ret = this.copy();
ret.groupByKey = keySelector;
return ret;
}
- private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) {
+ private WindowedDataStream<T> groupBy(Keys<T> keys) {
return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(),
getExecutionConfig())));
}
@@ -228,8 +227,8 @@ public class WindowedDataStream<OUT> {
*
* @return The WindowedDataStream with local discretisation
*/
- public WindowedDataStream<OUT> local() {
- WindowedDataStream<OUT> out = copy();
+ public WindowedDataStream<T> local() {
+ WindowedDataStream<T> out = copy();
out.isLocal = true;
return out;
}
@@ -241,11 +240,11 @@ public class WindowedDataStream<OUT> {
*
* @return The discretised stream
*/
- public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
+ public DataStream<StreamWindow<T>> getDiscretizedStream() {
if (getEviction() instanceof KeepAllEvictionPolicy) {
throw new RuntimeException("Cannot get discretized stream for full stream window");
}
- return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>())
+ return discretize(WindowTransformation.NONE, new BasicWindowBuffer<T>())
.getDiscretizedStream();
}
@@ -255,7 +254,7 @@ public class WindowedDataStream<OUT> {
*
* @return The data stream consisting of the individual records.
*/
- public DataStream<OUT> flatten() {
+ public DataStream<T> flatten() {
return dataStream;
}
@@ -269,7 +268,7 @@ public class WindowedDataStream<OUT> {
* The reduce function that will be applied to the windows.
* @return The transformed DataStream
*/
- public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
+ public DiscretizedStream<T> reduceWindow(ReduceFunction<T> reduceFunction) {
// We check whether we should apply parallel time discretization, which
// is a more complex exploiting the monotonic properties of time
@@ -281,9 +280,9 @@ public class WindowedDataStream<OUT> {
WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
.with(clean(reduceFunction));
- WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
+ WindowBuffer<T> windowBuffer = getWindowBuffer(transformation);
- DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
+ DiscretizedStream<T> discretized = discretize(transformation, windowBuffer);
if (windowBuffer instanceof PreAggregator) {
return discretized;
@@ -310,11 +309,11 @@ public class WindowedDataStream<OUT> {
* The output type of the operator
* @return The transformed DataStream
*/
- public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction,
+ public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction,
TypeInformation<R> outType) {
return discretize(WindowTransformation.FOLDWINDOW.with(clean(foldFunction)),
- new BasicWindowBuffer<OUT>()).foldWindow(initialValue, foldFunction, outType);
+ new BasicWindowBuffer<T>()).foldWindow(initialValue, foldFunction, outType);
}
@@ -330,7 +329,7 @@ public class WindowedDataStream<OUT> {
* Initial value given to foldFunction
* @return The transformed DataStream
*/
- public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction) {
+ public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction) {
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(foldFunction),
getType());
@@ -350,7 +349,7 @@ public class WindowedDataStream<OUT> {
* The function that will be applied to the windows.
* @return The transformed DataStream
*/
- public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
+ public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction) {
return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction);
}
@@ -373,7 +372,7 @@ public class WindowedDataStream<OUT> {
* The output type of the operator.
* @return The transformed DataStream
*/
- public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction,
+ public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction,
TypeInformation<R> outType) {
return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
@@ -381,24 +380,24 @@ public class WindowedDataStream<OUT> {
outType);
}
- private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
- WindowBuffer<OUT> windowBuffer) {
+ private DiscretizedStream<T> discretize(WindowTransformation transformation,
+ WindowBuffer<T> windowBuffer) {
- OneInputStreamOperator<OUT, WindowEvent<OUT>> discretizer = getDiscretizer();
+ OneInputStreamOperator<T, WindowEvent<T>> discretizer = getDiscretizer();
- OneInputStreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> bufferOperator = getBufferOperator(windowBuffer);
+ OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> bufferOperator = getBufferOperator(windowBuffer);
@SuppressWarnings({ "unchecked", "rawtypes" })
- TypeInformation<WindowEvent<OUT>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
+ TypeInformation<WindowEvent<T>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
getType(), BasicTypeInfo.INT_TYPE_INFO);
int parallelism = getDiscretizerParallelism(transformation);
- return new DiscretizedStream<OUT>(dataStream
+ return new DiscretizedStream<T>(dataStream
.transform(discretizer.getClass().getSimpleName(), bufferEventType, discretizer)
.setParallelism(parallelism)
.transform(windowBuffer.getClass().getSimpleName(),
- new StreamWindowTypeInfo<OUT>(getType()), bufferOperator)
+ new StreamWindowTypeInfo<T>(getType()), bufferOperator)
.setParallelism(parallelism), groupByKey, transformation, false);
}
@@ -430,7 +429,7 @@ public class WindowedDataStream<OUT> {
* Reduce function to apply
* @return The transformed stream
*/
- protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
+ protected DiscretizedStream<T> timeReduce(ReduceFunction<T> reduceFunction) {
WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
.with(clean(reduceFunction));
@@ -438,7 +437,7 @@ public class WindowedDataStream<OUT> {
// We get the windowbuffer and set it to emit empty windows with
// sequential IDs. This logic is necessary to merge windows created in
// parallel.
- WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
+ WindowBuffer<T> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
// If there is a groupby for the reduce operation we apply it before the
// discretizers, because we will forward everything afterwards to
@@ -449,7 +448,7 @@ public class WindowedDataStream<OUT> {
// We discretize the stream and call the timeReduce function of the
// discretized stream, we also pass the type of the windowbuffer
- DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
+ DiscretizedStream<T> discretized = discretize(transformation, windowBuffer);
if (getEviction() instanceof KeepAllEvictionPolicy
&& !(windowBuffer instanceof PreAggregator)) {
@@ -464,27 +463,27 @@ public class WindowedDataStream<OUT> {
/**
* Based on the defined policies, returns the stream discretizer to be used
*/
- private OneInputStreamOperator<OUT, WindowEvent<OUT>> getDiscretizer() {
+ private OneInputStreamOperator<T, WindowEvent<T>> getDiscretizer() {
if (discretizerKey == null) {
- return new StreamDiscretizer<OUT>(getTrigger(), getEviction());
+ return new StreamDiscretizer<T>(getTrigger(), getEviction());
} else if (getTrigger() instanceof CentralActiveTrigger) {
- return new GroupedActiveDiscretizer<OUT>(discretizerKey,
- (CentralActiveTrigger<OUT>) getTrigger(),
- (CloneableEvictionPolicy<OUT>) getEviction());
+ return new GroupedActiveDiscretizer<T>(discretizerKey,
+ (CentralActiveTrigger<T>) getTrigger(),
+ (CloneableEvictionPolicy<T>) getEviction());
} else {
- return new GroupedStreamDiscretizer<OUT>(discretizerKey,
- (CloneableTriggerPolicy<OUT>) getTrigger(),
- (CloneableEvictionPolicy<OUT>) getEviction());
+ return new GroupedStreamDiscretizer<T>(discretizerKey,
+ (CloneableTriggerPolicy<T>) getTrigger(),
+ (CloneableEvictionPolicy<T>) getEviction());
}
}
- private OneInputStreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> getBufferOperator(
- WindowBuffer<OUT> windowBuffer) {
+ private OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> getBufferOperator(
+ WindowBuffer<T> windowBuffer) {
if (discretizerKey == null) {
- return new StreamWindowBuffer<OUT>(windowBuffer);
+ return new StreamWindowBuffer<T>(windowBuffer);
} else {
- return new GroupedWindowBuffer<OUT>(windowBuffer, discretizerKey);
+ return new GroupedWindowBuffer<T>(windowBuffer, discretizerKey);
}
}
@@ -496,43 +495,43 @@ public class WindowedDataStream<OUT> {
*
*/
@SuppressWarnings("unchecked")
- private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
- TriggerPolicy<OUT> trigger = getTrigger();
- EvictionPolicy<OUT> eviction = getEviction();
+ private WindowBuffer<T> getWindowBuffer(WindowTransformation transformation) {
+ TriggerPolicy<T> trigger = getTrigger();
+ EvictionPolicy<T> eviction = getEviction();
if (transformation == WindowTransformation.REDUCEWINDOW) {
if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
if (eviction instanceof KeepAllEvictionPolicy) {
if (groupByKey == null) {
- return new TumblingPreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), getType()
+ return new TumblingPreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig())).noEvict();
} else {
- return new TumblingGroupedPreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
+ return new TumblingGroupedPreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), groupByKey,
getType().createSerializer(getExecutionConfig())).noEvict();
}
} else {
if (groupByKey == null) {
- return new TumblingPreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), getType()
+ return new TumblingPreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig()));
} else {
- return new TumblingGroupedPreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
+ return new TumblingGroupedPreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), groupByKey,
getType().createSerializer(getExecutionConfig()));
}
}
} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
if (groupByKey == null) {
- return new SlidingCountPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ return new SlidingCountPreReducer<T>(
+ clean((ReduceFunction<T>) transformation.getUDF()), dataStream
.getType().createSerializer(getExecutionConfig()),
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
((CountTriggerPolicy<?>) trigger).getStart());
} else {
- return new SlidingCountGroupedPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+ return new SlidingCountGroupedPreReducer<T>(
+ clean((ReduceFunction<T>) transformation.getUDF()), dataStream
.getType().createSerializer(getExecutionConfig()), groupByKey,
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
((CountTriggerPolicy<?>) trigger).getStart());
@@ -540,14 +539,14 @@ public class WindowedDataStream<OUT> {
} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
if (groupByKey == null) {
- return new SlidingTimePreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+ return new SlidingTimePreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
WindowUtils.getTimeStampWrapper(trigger));
} else {
- return new SlidingTimeGroupedPreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+ return new SlidingTimeGroupedPreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
.createSerializer(getExecutionConfig()), groupByKey,
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
WindowUtils.getTimeStampWrapper(trigger));
@@ -555,26 +554,26 @@ public class WindowedDataStream<OUT> {
} else if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) {
if (groupByKey == null) {
- return new JumpingCountPreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), getType()
+ return new JumpingCountPreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
} else {
- return new JumpingCountGroupedPreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+ return new JumpingCountGroupedPreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), groupByKey, getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
}
} else if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) {
if (groupByKey == null) {
- return new JumpingTimePreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), getType()
+ return new JumpingTimePreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
WindowUtils.getTimeStampWrapper(trigger));
} else {
- return new JumpingTimeGroupedPreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+ return new JumpingTimeGroupedPreReducer<T>(
+ (ReduceFunction<T>) transformation.getUDF(), groupByKey, getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
WindowUtils.getTimeStampWrapper(trigger));
@@ -586,7 +585,7 @@ public class WindowedDataStream<OUT> {
throw new RuntimeException(
"Full stream policy can only be used with operations that support preaggregations, such as reduce or aggregations");
} else {
- return new BasicWindowBuffer<OUT>();
+ return new BasicWindowBuffer<T>();
}
}
@@ -598,8 +597,8 @@ public class WindowedDataStream<OUT> {
* The position in the tuple/array to sum
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> sum(int positionToSum) {
- return aggregate(new SumAggregator<OUT>(positionToSum, getType(), getExecutionConfig()));
+ public WindowedDataStream<T> sum(int positionToSum) {
+ return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig()));
}
/**
@@ -613,8 +612,8 @@ public class WindowedDataStream<OUT> {
* The field to sum
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> sum(String field) {
- return aggregate(new SumAggregator<OUT>(field, getType(), getExecutionConfig()));
+ public WindowedDataStream<T> sum(String field) {
+ return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig()));
}
/**
@@ -625,8 +624,8 @@ public class WindowedDataStream<OUT> {
* The position to minimize
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> min(int positionToMin) {
- return aggregate(new ComparableAggregator<OUT>(positionToMin, getType(), AggregationType.MIN,
+ public WindowedDataStream<T> min(int positionToMin) {
+ return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN,
getExecutionConfig()));
}
@@ -642,8 +641,8 @@ public class WindowedDataStream<OUT> {
* applied.
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> min(String field) {
- return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MIN,
+ public WindowedDataStream<T> min(String field) {
+ return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN,
false, getExecutionConfig()));
}
@@ -656,7 +655,7 @@ public class WindowedDataStream<OUT> {
* The position to minimize by
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> minBy(int positionToMinBy) {
+ public WindowedDataStream<T> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
@@ -669,7 +668,7 @@ public class WindowedDataStream<OUT> {
* The position to minimize by
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> minBy(String positionToMinBy) {
+ public WindowedDataStream<T> minBy(String positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
@@ -686,8 +685,8 @@ public class WindowedDataStream<OUT> {
* minimum value, otherwise returns the last
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> minBy(int positionToMinBy, boolean first) {
- return aggregate(new ComparableAggregator<OUT>(positionToMinBy, getType(), AggregationType.MINBY, first,
+ public WindowedDataStream<T> minBy(int positionToMinBy, boolean first) {
+ return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first,
getExecutionConfig()));
}
@@ -706,8 +705,8 @@ public class WindowedDataStream<OUT> {
* be returned
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> minBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MINBY,
+ public WindowedDataStream<T> minBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MINBY,
first, getExecutionConfig()));
}
@@ -719,8 +718,8 @@ public class WindowedDataStream<OUT> {
* The position to maximize
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> max(int positionToMax) {
- return aggregate(new ComparableAggregator<OUT>(positionToMax, getType(), AggregationType.MAX,
+ public WindowedDataStream<T> max(int positionToMax) {
+ return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX,
getExecutionConfig()));
}
@@ -736,8 +735,8 @@ public class WindowedDataStream<OUT> {
* applied.
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> max(String field) {
- return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAX,
+ public WindowedDataStream<T> max(String field) {
+ return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX,
false, getExecutionConfig()));
}
@@ -750,7 +749,7 @@ public class WindowedDataStream<OUT> {
* The position to maximize by
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> maxBy(int positionToMaxBy) {
+ public WindowedDataStream<T> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
@@ -763,7 +762,7 @@ public class WindowedDataStream<OUT> {
* The position to maximize by
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> maxBy(String positionToMaxBy) {
+ public WindowedDataStream<T> maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
@@ -780,8 +779,8 @@ public class WindowedDataStream<OUT> {
* maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> maxBy(int positionToMaxBy, boolean first) {
- return aggregate(new ComparableAggregator<OUT>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
+ public WindowedDataStream<T> maxBy(int positionToMaxBy, boolean first) {
+ return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
getExecutionConfig()));
}
@@ -800,16 +799,16 @@ public class WindowedDataStream<OUT> {
* be returned
* @return The transformed DataStream.
*/
- public WindowedDataStream<OUT> maxBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAXBY, first,
+ public WindowedDataStream<T> maxBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY, first,
getExecutionConfig()));
}
- private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> aggregator) {
+ private WindowedDataStream<T> aggregate(AggregationFunction<T> aggregator) {
return reduceWindow(aggregator);
}
- protected TriggerPolicy<OUT> getTrigger() {
+ protected TriggerPolicy<T> getTrigger() {
if (triggerHelper != null) {
return triggerHelper.toTrigger();
@@ -821,7 +820,7 @@ public class WindowedDataStream<OUT> {
}
- protected EvictionPolicy<OUT> getEviction() {
+ protected EvictionPolicy<T> getEviction() {
if (evictionHelper != null) {
return evictionHelper.toEvict();
@@ -829,7 +828,7 @@ public class WindowedDataStream<OUT> {
if (triggerHelper instanceof Time) {
return triggerHelper.toEvict();
} else {
- return new TumblingEvictionPolicy<OUT>();
+ return new TumblingEvictionPolicy<T>();
}
} else {
return userEvicter;
@@ -854,7 +853,7 @@ public class WindowedDataStream<OUT> {
*
* @return The output type.
*/
- public TypeInformation<OUT> getType() {
+ public TypeInformation<T> getType() {
return dataStream.getType();
}
@@ -862,7 +861,7 @@ public class WindowedDataStream<OUT> {
return dataStream.getExecutionConfig();
}
- protected WindowedDataStream<OUT> copy() {
- return new WindowedDataStream<OUT>(this);
+ protected WindowedDataStream<T> copy() {
+ return new WindowedDataStream<T>(this);
}
}