You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/24 15:21:44 UTC

[2/2] flink git commit: [streaming] Stream operators robustness improved for serilization

[streaming] Stream operators robustness improved for serilization

This closes #620


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/046f39ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/046f39ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/046f39ea

Branch: refs/heads/master
Commit: 046f39ea448050ea52cb2389b0acad9d58ddafda
Parents: 3f3830d
Author: mbalassi <mb...@apache.org>
Authored: Fri Apr 24 11:04:32 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Apr 24 15:17:11 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/graph/StreamNode.java |  2 +-
 .../flink/streaming/api/operators/StreamFilter.java      |  5 ++---
 .../flink/streaming/api/operators/StreamFlatMap.java     |  6 ++----
 .../apache/flink/streaming/api/operators/StreamFold.java |  7 ++-----
 .../flink/streaming/api/operators/StreamGroupedFold.java |  3 +++
 .../streaming/api/operators/StreamGroupedReduce.java     |  3 ++-
 .../flink/streaming/api/operators/StreamOperator.java    |  2 +-
 .../flink/streaming/api/operators/StreamReduce.java      |  5 ++---
 .../apache/flink/streaming/api/operators/StreamSink.java |  6 ++----
 .../flink/streaming/api/operators/StreamSource.java      |  9 ++++-----
 .../streaming/api/operators/co/CoStreamFlatMap.java      |  9 ++++-----
 .../api/operators/co/CoStreamGroupedReduce.java          | 11 ++++++++---
 .../streaming/api/operators/co/CoStreamOperator.java     |  4 ++--
 .../flink/streaming/api/operators/co/CoStreamReduce.java |  6 ++++--
 .../flink/streaming/api/operators/co/CoStreamWindow.java |  5 ++---
 .../streaming/api/operators/windowing/WindowMerger.java  |  2 +-
 .../api/windowing/policy/MultiEvictionPolicy.java        |  2 +-
 .../streaming/runtime/partitioner/FieldsPartitioner.java |  2 +-
 .../streaming/runtime/partitioner/StreamPartitioner.java |  2 +-
 19 files changed, 45 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index cb07f42..576150e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -46,7 +46,7 @@ public class StreamNode implements Serializable {
 	private Long bufferTimeout = null;
 	private String operatorName;
 
-	private StreamOperator<?, ?> operator;
+	private transient StreamOperator<?, ?> operator;
 	private List<OutputSelector<?>> outputSelectors;
 	private StreamRecordSerializer<?> typeSerializerIn1;
 	private StreamRecordSerializer<?> typeSerializerIn2;

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
index d2cddf6..898f5ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
@@ -23,12 +23,10 @@ public class StreamFilter<IN> extends ChainableStreamOperator<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
 
-	FilterFunction<IN> filterFunction;
 	private boolean collect;
 
 	public StreamFilter(FilterFunction<IN> filterFunction) {
 		super(filterFunction);
-		this.filterFunction = filterFunction;
 	}
 
 	@Override
@@ -39,8 +37,9 @@ public class StreamFilter<IN> extends ChainableStreamOperator<IN, IN> {
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
-		collect = filterFunction.filter(nextObject);
+		collect = ((FilterFunction<IN>) userFunction).filter(nextObject);
 		if (collect) {
 			collector.collect(nextObject);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index a17b162..2b8a3a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -22,11 +22,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 public class StreamFlatMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private FlatMapFunction<IN, OUT> flatMapper;
-
 	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
 		super(flatMapper);
-		this.flatMapper = flatMapper;
 	}
 
 	@Override
@@ -37,8 +34,9 @@ public class StreamFlatMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
-		flatMapper.flatMap(nextObject, collector);
+		((FlatMapFunction<IN, OUT>) userFunction).flatMap(nextObject, collector);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
index fc5f187..542f65c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -24,14 +24,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 public class StreamFold<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	protected FoldFunction<IN, OUT> folder;
 	private OUT accumulator;
 	protected TypeSerializer<OUT> outTypeSerializer;
 
 	public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue,
 			TypeInformation<OUT> outTypeInformation) {
 		super(folder);
-		this.folder = folder;
 		this.accumulator = initialValue;
 		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
 	}
@@ -44,10 +42,9 @@ public class StreamFold<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
-
-		accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
+		accumulator = ((FoldFunction<IN, OUT>) userFunction).fold(outTypeSerializer.copy(accumulator), nextObject);
 		collector.collect(accumulator);
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 303f1b3..88c75df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -40,9 +40,12 @@ public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
 		Object key = nextRecord.getKey(keySelector);
 		OUT accumulator = values.get(key);
+		FoldFunction<IN, OUT> folder = ((FoldFunction<IN, OUT>) userFunction);
+
 		if (accumulator != null) {
 			OUT folded = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
 			values.put(key, folded);

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index f5c8f21..d254fd4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -36,11 +36,12 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
 		Object key = keySelector.getKey(nextObject);
 		IN currentValue = values.get(key);
 		if (currentValue != null) {
-			IN reduced = reducer.reduce(copy(currentValue), nextObject);
+			IN reduced = ((ReduceFunction<IN>) userFunction).reduce(copy(currentValue), nextObject);
 			values.put(key, reduced);
 			collector.collect(reduced);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 6d6c793..5cb3ec9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -199,7 +199,7 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
 	}
 
 	public static enum ChainingStrategy {
-		ALWAYS, NEVER, HEAD;
+		ALWAYS, NEVER, HEAD
 	}
 
 	public Function getUserFunction() {

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
index 179d690..fdf1284 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
@@ -22,12 +22,10 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 public class StreamReduce<IN> extends ChainableStreamOperator<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
-	protected ReduceFunction<IN> reducer;
 	private IN currentValue;
 
 	public StreamReduce(ReduceFunction<IN> reducer) {
 		super(reducer);
-		this.reducer = reducer;
 		currentValue = null;
 	}
 
@@ -39,10 +37,11 @@ public class StreamReduce<IN> extends ChainableStreamOperator<IN, IN> {
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
 
 		if (currentValue != null) {
-			currentValue = reducer.reduce(copy(currentValue), nextObject);
+			currentValue = ((ReduceFunction<IN>) userFunction).reduce(copy(currentValue), nextObject);
 		} else {
 			currentValue = nextObject;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index d1f93d1..26e37fa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -22,11 +22,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 public class StreamSink<IN> extends ChainableStreamOperator<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
-	private SinkFunction<IN> sinkFunction;
-
 	public StreamSink(SinkFunction<IN> sinkFunction) {
 		super(sinkFunction);
-		this.sinkFunction = sinkFunction;
 	}
 
 	@Override
@@ -37,7 +34,8 @@ public class StreamSink<IN> extends ChainableStreamOperator<IN, IN> {
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
-		sinkFunction.invoke(nextObject);
+		((SinkFunction<IN>) userFunction).invoke(nextObject);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 8c834f5..ef253ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -25,11 +25,8 @@ public class StreamSource<OUT> extends StreamOperator<OUT, OUT> implements Seria
 
 	private static final long serialVersionUID = 1L;
 
-	private SourceFunction<OUT> sourceFunction;
-
 	public StreamSource(SourceFunction<OUT> sourceFunction) {
 		super(sourceFunction);
-		this.sourceFunction = sourceFunction;
 	}
 
 	@Override
@@ -38,13 +35,15 @@ public class StreamSource<OUT> extends StreamOperator<OUT, OUT> implements Seria
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
-		sourceFunction.run(collector);
+		((SourceFunction<OUT>) userFunction).run(collector);
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public void cancel() {
 		super.cancel();
-		sourceFunction.cancel();
+		((SourceFunction<OUT>) userFunction).cancel();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index 004a17a..95f089c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -22,11 +22,8 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 public class CoStreamFlatMap<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private CoFlatMapFunction<IN1, IN2, OUT> flatMapper;
-
 	public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
-		this.flatMapper = flatMapper;
 	}
 
 	@Override
@@ -40,14 +37,16 @@ public class CoStreamFlatMap<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, O
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction1() throws Exception {
-		flatMapper.flatMap1(reuse1.getObject(), collector);
+		((CoFlatMapFunction<IN1, IN2, OUT>) userFunction).flatMap1(reuse1.getObject(), collector);
 
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction2() throws Exception {
-		flatMapper.flatMap2(reuse2.getObject(), collector);
+		((CoFlatMapFunction<IN1, IN2, OUT>) userFunction).flatMap2(reuse2.getObject(), collector);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
index 2ed3b2e..0a76392 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
@@ -36,7 +36,6 @@ public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN
 	public CoStreamGroupedReduce(CoReduceFunction<IN1, IN2, OUT> coReducer,
 			KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
 		super(coReducer);
-		this.coReducer = coReducer;
 		this.keySelector1 = keySelector1;
 		this.keySelector2 = keySelector2;
 		values1 = new HashMap<Object, IN1>();
@@ -44,7 +43,9 @@ public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public void handleStream1() throws Exception {
+		CoReduceFunction<IN1, IN2, OUT> coReducer = (CoReduceFunction<IN1, IN2, OUT>) userFunction;
 		Object key = reuse1.getKey(keySelector1);
 		currentValue1 = values1.get(key);
 		nextValue1 = reuse1.getObject();
@@ -59,7 +60,9 @@ public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public void handleStream2() throws Exception {
+		CoReduceFunction<IN1, IN2, OUT> coReducer = (CoReduceFunction<IN1, IN2, OUT>) userFunction;
 		Object key = reuse2.getKey(keySelector2);
 		currentValue2 = values2.get(key);
 		nextValue2 = reuse2.getObject();
@@ -74,14 +77,16 @@ public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction1() throws Exception {
-		reduced1 = coReducer.reduce1(currentValue1, nextValue1);
+		reduced1 = ((CoReduceFunction<IN1, IN2, OUT>) userFunction).reduce1(currentValue1, nextValue1);
 
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction2() throws Exception {
-		reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+		reduced2 = ((CoReduceFunction<IN1, IN2, OUT>) userFunction).reduce2(currentValue2, nextValue2);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
index 214cb17..5e764ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
@@ -122,11 +122,11 @@ public abstract class CoStreamOperator<IN1, IN2, OUT> extends StreamOperator<IN1
 
 	protected void initialize1() {
 
-	};
+	}
 
 	protected void initialize2() {
 
-	};
+	}
 
 	protected void callUserFunctionAndLogException1() {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
index c280174..90aecc7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
@@ -22,7 +22,6 @@ import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
 public class CoStreamReduce<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	protected CoReduceFunction<IN1, IN2, OUT> coReducer;
 	protected IN1 currentValue1 = null;
 	protected IN2 currentValue2 = null;
 	protected IN1 nextValue1 = null;
@@ -30,7 +29,6 @@ public class CoStreamReduce<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
 
 	public CoStreamReduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
 		super(coReducer);
-		this.coReducer = coReducer;
 		currentValue1 = null;
 		currentValue2 = null;
 	}
@@ -48,7 +46,9 @@ public class CoStreamReduce<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction1() throws Exception {
+		CoReduceFunction<IN1, IN2, OUT> coReducer = (CoReduceFunction<IN1, IN2, OUT>) userFunction;
 		if (currentValue1 != null) {
 			currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
 		} else {
@@ -58,7 +58,9 @@ public class CoStreamReduce<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction2() throws Exception {
+		CoReduceFunction<IN1, IN2, OUT> coReducer = (CoReduceFunction<IN1, IN2, OUT>) userFunction;
 		if (currentValue2 != null) {
 			currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
index 0875b7e..78371cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class CoStreamWindow<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	protected CoWindowFunction<IN1, IN2, OUT> coWindowFunction;
 	protected long windowSize;
 	protected long slideSize;
 	protected CircularFifoList<StreamRecord<IN1>> circularList1;
@@ -46,7 +45,6 @@ public class CoStreamWindow<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
 	public CoStreamWindow(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
 			long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
 		super(coWindowFunction);
-		this.coWindowFunction = coWindowFunction;
 		this.windowSize = windowSize;
 		this.slideSize = slideInterval;
 		this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
@@ -69,6 +67,7 @@ public class CoStreamWindow<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
 
 		List<IN1> first = new ArrayList<IN1>();
@@ -82,7 +81,7 @@ public class CoStreamWindow<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OU
 		}
 
 		if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) {
-			coWindowFunction.coWindow(first, second, collector);
+			((CoWindowFunction<IN1, IN2, OUT>) userFunction).coWindow(first, second, collector);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index fc03780..e69257b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -47,8 +47,8 @@ public class WindowMerger<T> extends ChainableStreamOperator<StreamWindow<T>, St
 		}
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
+	@SuppressWarnings("unchecked")
 	protected void callUserFunction() throws Exception {
 		StreamWindow<T> nextWindow = nextObject;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
index 0ad1605..79e8119 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
@@ -53,7 +53,7 @@ public class MultiEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA> {
 	 * </ul>
 	 */
 	public enum EvictionStrategy {
-		MIN, MAX, SUM, PRIORITY;
+		MIN, MAX, SUM, PRIORITY
 	}
 
 	private List<EvictionPolicy<DATA>> allEvictionPolicies;

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
index f44bd12..08c431b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class FieldsPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private int[] returnArray = new int[1];;
+	private int[] returnArray = new int[1];
 	KeySelector<T, ?> keySelector;
 
 	public FieldsPartitioner(KeySelector<T, ?> keySelector) {

http://git-wip-us.apache.org/repos/asf/flink/blob/046f39ea/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index cd5b9c2..3af7c7a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -27,7 +27,7 @@ public abstract class StreamPartitioner<T> implements
 
 	public enum PartitioningStrategy {
 
-		FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY;
+		FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY
 
 	}