You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/24 15:21:44 UTC
[2/2] flink git commit: [streaming] Stream operators robustness
improved for serilization
[streaming] Stream operators robustness improved for serilization
This closes #620
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/046f39ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/046f39ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/046f39ea
Branch: refs/heads/master
Commit: 046f39ea448050ea52cb2389b0acad9d58ddafda
Parents: 3f3830d
Author: mbalassi <mb...@apache.org>
Authored: Fri Apr 24 11:04:32 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Apr 24 15:17:11 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/streaming/api/graph/StreamNode.java | 2 +-
.../flink/streaming/api/operators/StreamFilter.java | 5 ++---
.../flink/streaming/api/operators/StreamFlatMap.java | 6 ++----
.../apache/flink/streaming/api/operators/StreamFold.java | 7 ++-----
.../flink/streaming/api/operators/StreamGroupedFold.java | 3 +++
.../streaming/api/operators/StreamGroupedReduce.java | 3 ++-
.../flink/streaming/api/operators/StreamOperator.java | 2 +-
.../flink/streaming/api/operators/StreamReduce.java | 5 ++---
.../apache/flink/streaming/api/operators/StreamSink.java | 6 ++----
.../flink/streaming/api/operators/StreamSource.java | 9 ++++-----
.../streaming/api/operators/co/CoStreamFlatMap.java | 9 ++++-----
.../api/operators/co/CoStreamGroupedReduce.java | 11 ++++++++---
.../streaming/api/operators/co/CoStreamOperator.java | 4 ++--
.../flink/streaming/api/operators/co/CoStreamReduce.java | 6 ++++--
.../flink/streaming/api/operators/co/CoStreamWindow.java | 5 ++---
.../streaming/api/operators/windowing/WindowMerger.java | 2 +-
.../api/windowing/policy/MultiEvictionPolicy.java | 2 +-
.../streaming/runtime/partitioner/FieldsPartitioner.java | 2 +-
.../streaming/runtime/partitioner/StreamPartitioner.java | 2 +-
19 files changed, 45 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index cb07f42..576150e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -46,7 +46,7 @@ public class StreamNode implements Serializable {
private Long bufferTimeout = null;
private String operatorName;
- private StreamOperator<?, ?> operator;
+ private transient StreamOperator<?, ?> operator;
private List<OutputSelector<?>> outputSelectors;
private StreamRecordSerializer<?> typeSerializerIn1;
private StreamRecordSerializer<?> typeSerializerIn2;
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
index d2cddf6..898f5ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
@@ -23,12 +23,10 @@ public class StreamFilter<IN> extends ChainableStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
- FilterFunction<IN> filterFunction;
private boolean collect;
public StreamFilter(FilterFunction<IN> filterFunction) {
super(filterFunction);
- this.filterFunction = filterFunction;
}
@Override
@@ -39,8 +37,9 @@ public class StreamFilter<IN> extends ChainableStreamOperator<IN, IN> {
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
- collect = filterFunction.filter(nextObject);
+ collect = ((FilterFunction<IN>) userFunction).filter(nextObject);
if (collect) {
collector.collect(nextObject);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index a17b162..2b8a3a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -22,11 +22,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
public class StreamFlatMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
- private FlatMapFunction<IN, OUT> flatMapper;
-
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
- this.flatMapper = flatMapper;
}
@Override
@@ -37,8 +34,9 @@ public class StreamFlatMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
- flatMapper.flatMap(nextObject, collector);
+ ((FlatMapFunction<IN, OUT>) userFunction).flatMap(nextObject, collector);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
index fc5f187..542f65c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -24,14 +24,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
public class StreamFold<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
- protected FoldFunction<IN, OUT> folder;
private OUT accumulator;
protected TypeSerializer<OUT> outTypeSerializer;
public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue,
TypeInformation<OUT> outTypeInformation) {
super(folder);
- this.folder = folder;
this.accumulator = initialValue;
this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
}
@@ -44,10 +42,9 @@ public class StreamFold<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
-
- accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
+ accumulator = ((FoldFunction<IN, OUT>) userFunction).fold(outTypeSerializer.copy(accumulator), nextObject);
collector.collect(accumulator);
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 303f1b3..88c75df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -40,9 +40,12 @@ public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
Object key = nextRecord.getKey(keySelector);
OUT accumulator = values.get(key);
+ FoldFunction<IN, OUT> folder = ((FoldFunction<IN, OUT>) userFunction);
+
if (accumulator != null) {
OUT folded = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
values.put(key, folded);
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index f5c8f21..d254fd4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -36,11 +36,12 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
Object key = keySelector.getKey(nextObject);
IN currentValue = values.get(key);
if (currentValue != null) {
- IN reduced = reducer.reduce(copy(currentValue), nextObject);
+ IN reduced = ((ReduceFunction<IN>) userFunction).reduce(copy(currentValue), nextObject);
values.put(key, reduced);
collector.collect(reduced);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 6d6c793..5cb3ec9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -199,7 +199,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
}
public static enum ChainingStrategy {
- ALWAYS, NEVER, HEAD;
+ ALWAYS, NEVER, HEAD
}
public Function getUserFunction() {
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
index 179d690..fdf1284 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
@@ -22,12 +22,10 @@ import org.apache.flink.api.common.functions.ReduceFunction;
public class StreamReduce<IN> extends ChainableStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
- protected ReduceFunction<IN> reducer;
private IN currentValue;
public StreamReduce(ReduceFunction<IN> reducer) {
super(reducer);
- this.reducer = reducer;
currentValue = null;
}
@@ -39,10 +37,11 @@ public class StreamReduce<IN> extends ChainableStreamOperator<IN, IN> {
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
if (currentValue != null) {
- currentValue = reducer.reduce(copy(currentValue), nextObject);
+ currentValue = ((ReduceFunction<IN>) userFunction).reduce(copy(currentValue), nextObject);
} else {
currentValue = nextObject;
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index d1f93d1..26e37fa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -22,11 +22,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class StreamSink<IN> extends ChainableStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
- private SinkFunction<IN> sinkFunction;
-
public StreamSink(SinkFunction<IN> sinkFunction) {
super(sinkFunction);
- this.sinkFunction = sinkFunction;
}
@Override
@@ -37,7 +34,8 @@ public class StreamSink<IN> extends ChainableStreamOperator<IN, IN> {
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
- sinkFunction.invoke(nextObject);
+ ((SinkFunction<IN>) userFunction).invoke(nextObject);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 8c834f5..ef253ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -25,11 +25,8 @@ public class StreamSource<OUT> extends StreamOperator<OUT, OUT> implements Seria
private static final long serialVersionUID = 1L;
- private SourceFunction<OUT> sourceFunction;
-
public StreamSource(SourceFunction<OUT> sourceFunction) {
super(sourceFunction);
- this.sourceFunction = sourceFunction;
}
@Override
@@ -38,13 +35,15 @@ public class StreamSource<OUT> extends StreamOperator<OUT, OUT> implements Seria
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
- sourceFunction.run(collector);
+ ((SourceFunction<OUT>) userFunction).run(collector);
}
@Override
+ @SuppressWarnings("unchecked")
public void cancel() {
super.cancel();
- sourceFunction.cancel();
+ ((SourceFunction<OUT>) userFunction).cancel();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index 004a17a..95f089c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -22,11 +22,8 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
public class CoStreamFlatMap<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
- private CoFlatMapFunction<IN1, IN2, OUT> flatMapper;
-
public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
- this.flatMapper = flatMapper;
}
@Override
@@ -40,14 +37,16 @@ public class CoStreamFlatMap<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, O
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction1() throws Exception {
- flatMapper.flatMap1(reuse1.getObject(), collector);
+ ((CoFlatMapFunction<IN1, IN2, OUT>) userFunction).flatMap1(reuse1.getObject(), collector);
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction2() throws Exception {
- flatMapper.flatMap2(reuse2.getObject(), collector);
+ ((CoFlatMapFunction<IN1, IN2, OUT>) userFunction).flatMap2(reuse2.getObject(), collector);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
index 2ed3b2e..0a76392 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
@@ -36,7 +36,6 @@ public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN
public CoStreamGroupedReduce(CoReduceFunction<IN1, IN2, OUT> coReducer,
KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
super(coReducer);
- this.coReducer = coReducer;
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
values1 = new HashMap<Object, IN1>();
@@ -44,7 +43,9 @@ public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN
}
@Override
+ @SuppressWarnings("unchecked")
public void handleStream1() throws Exception {
+ CoReduceFunction<IN1, IN2, OUT> coReducer = (CoReduceFunction<IN1, IN2, OUT>) userFunction;
Object key = reuse1.getKey(keySelector1);
currentValue1 = values1.get(key);
nextValue1 = reuse1.getObject();
@@ -59,7 +60,9 @@ public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN
}
@Override
+ @SuppressWarnings("unchecked")
public void handleStream2() throws Exception {
+ CoReduceFunction<IN1, IN2, OUT> coReducer = (CoReduceFunction<IN1, IN2, OUT>) userFunction;
Object key = reuse2.getKey(keySelector2);
currentValue2 = values2.get(key);
nextValue2 = reuse2.getObject();
@@ -74,14 +77,16 @@ public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction1() throws Exception {
- reduced1 = coReducer.reduce1(currentValue1, nextValue1);
+ reduced1 = ((CoReduceFunction<IN1, IN2, OUT>) userFunction).reduce1(currentValue1, nextValue1);
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction2() throws Exception {
- reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+ reduced2 = ((CoReduceFunction<IN1, IN2, OUT>) userFunction).reduce2(currentValue2, nextValue2);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
index 214cb17..5e764ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
@@ -122,11 +122,11 @@ public abstract class CoStreamOperator<IN1, IN2, OUT> extends StreamOperator<IN1
protected void initialize1() {
- };
+ }
protected void initialize2() {
- };
+ }
protected void callUserFunctionAndLogException1() {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
index c280174..90aecc7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
@@ -22,7 +22,6 @@ import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
public class CoStreamReduce<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
- protected CoReduceFunction<IN1, IN2, OUT> coReducer;
protected IN1 currentValue1 = null;
protected IN2 currentValue2 = null;
protected IN1 nextValue1 = null;
@@ -30,7 +29,6 @@ public class CoStreamReduce<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
public CoStreamReduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
super(coReducer);
- this.coReducer = coReducer;
currentValue1 = null;
currentValue2 = null;
}
@@ -48,7 +46,9 @@ public class CoStreamReduce<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction1() throws Exception {
+ CoReduceFunction<IN1, IN2, OUT> coReducer = (CoReduceFunction<IN1, IN2, OUT>) userFunction;
if (currentValue1 != null) {
currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
} else {
@@ -58,7 +58,9 @@ public class CoStreamReduce<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction2() throws Exception {
+ CoReduceFunction<IN1, IN2, OUT> coReducer = (CoReduceFunction<IN1, IN2, OUT>) userFunction;
if (currentValue2 != null) {
currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
index 0875b7e..78371cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
public class CoStreamWindow<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
- protected CoWindowFunction<IN1, IN2, OUT> coWindowFunction;
protected long windowSize;
protected long slideSize;
protected CircularFifoList<StreamRecord<IN1>> circularList1;
@@ -46,7 +45,6 @@ public class CoStreamWindow<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
public CoStreamWindow(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
super(coWindowFunction);
- this.coWindowFunction = coWindowFunction;
this.windowSize = windowSize;
this.slideSize = slideInterval;
this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
@@ -69,6 +67,7 @@ public class CoStreamWindow<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
}
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
List<IN1> first = new ArrayList<IN1>();
@@ -82,7 +81,7 @@ public class CoStreamWindow<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
}
if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) {
- coWindowFunction.coWindow(first, second, collector);
+ ((CoWindowFunction<IN1, IN2, OUT>) userFunction).coWindow(first, second, collector);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index fc03780..e69257b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -47,8 +47,8 @@ public class WindowMerger<T> extends ChainableStreamOperator<StreamWindow<T>, St
}
}
- @SuppressWarnings("unchecked")
@Override
+ @SuppressWarnings("unchecked")
protected void callUserFunction() throws Exception {
StreamWindow<T> nextWindow = nextObject;
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
index 0ad1605..79e8119 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
@@ -53,7 +53,7 @@ public class MultiEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA> {
* </ul>
*/
public enum EvictionStrategy {
- MIN, MAX, SUM, PRIORITY;
+ MIN, MAX, SUM, PRIORITY
}
private List<EvictionPolicy<DATA>> allEvictionPolicies;
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
index f44bd12..08c431b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
public class FieldsPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
- private int[] returnArray = new int[1];;
+ private int[] returnArray = new int[1];
KeySelector<T, ?> keySelector;
public FieldsPartitioner(KeySelector<T, ?> keySelector) {
http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index cd5b9c2..3af7c7a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -27,7 +27,7 @@ public abstract class StreamPartitioner<T> implements
public enum PartitioningStrategy {
- FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY;
+ FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY
}