You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:43 UTC

[06/51] [abbrv] git commit: [streaming] Added mutability switch for operators + cleanup

[streaming] Added mutability switch for operators + cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c21f0e35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c21f0e35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c21f0e35

Branch: refs/heads/master
Commit: c21f0e351e0976b66436ee55a5bc12fb461ae3fc
Parents: 0c1ef6d
Author: gyfora <gy...@gmail.com>
Authored: Mon Jul 21 11:24:49 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:14:13 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java  |  6 ++
 .../flink/streaming/api/JobGraphBuilder.java    | 19 ++++--
 .../api/StreamExecutionEnvironment.java         | 52 ++++++++-------
 .../api/invokable/DefaultSinkInvokable.java     | 30 ---------
 .../api/invokable/DefaultSourceInvokable.java   | 35 ----------
 .../api/invokable/DefaultTaskInvokable.java     | 31 ---------
 .../streaming/api/invokable/SinkInvokable.java  | 15 +++--
 .../api/invokable/StreamRecordInvokable.java    | 10 ++-
 .../api/invokable/UserSinkInvokable.java        | 31 ---------
 .../operator/BatchReduceInvokable.java          | 17 +++--
 .../api/invokable/operator/FilterInvokable.java | 16 ++++-
 .../invokable/operator/FlatMapInvokable.java    | 11 +++-
 .../api/invokable/operator/MapInvokable.java    | 11 +++-
 .../AbstractStreamComponent.java                |  2 +
 .../api/streamcomponent/StreamSink.java         | 11 ++--
 .../api/streamcomponent/StreamSource.java       |  3 +-
 .../api/streamcomponent/StreamTask.java         |  5 +-
 .../api/streamrecord/StreamRecordTest.java      | 68 --------------------
 .../partitioner/BroadcastPartitionerTest.java   | 10 +--
 .../partitioner/FieldsPartitionerTest.java      | 37 ++++++-----
 .../partitioner/GlobalPartitionerTest.java      | 14 ++--
 .../partitioner/ShufflePartitionerTest.java     | 24 +++----
 22 files changed, 173 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index e3d19f9..d327fad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -129,6 +129,11 @@ public class DataStream<T extends Tuple> {
 	public String getId() {
 		return id;
 	}
+	
+	public DataStream<T> setMutability(boolean isMutable){
+		environment.setMutability(this,isMutable);
+		return this;
+	}
 
 	/**
 	 * Sets the degree of parallelism for this operator. The degree must be 1 or
@@ -590,6 +595,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsCsv(String path, int batchSize, T endTuple) {
+		setMutability(false);
 		environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
 		return new DataStream<T>(this);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 02a73b1..2b3ee5a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -43,8 +43,8 @@ import org.apache.flink.runtime.jobgraph.JobTaskVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
-import org.apache.flink.streaming.api.invokable.UserSinkInvokable;
 import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
@@ -70,6 +70,7 @@ public class JobGraphBuilder {
 	// Graph attributes
 	private Map<String, AbstractJobVertex> components;
 	private Map<String, Integer> componentParallelism;
+	private Map<String, Boolean> mutability;
 	private Map<String, List<String>> outEdgeList;
 	private Map<String, List<String>> inEdgeList;
 	private Map<String, List<StreamPartitioner<? extends Tuple>>> connectionTypes;
@@ -100,6 +101,7 @@ public class JobGraphBuilder {
 
 		components = new HashMap<String, AbstractJobVertex>();
 		componentParallelism = new HashMap<String, Integer>();
+		mutability = new HashMap<String, Boolean>();
 		outEdgeList = new HashMap<String, List<String>>();
 		inEdgeList = new HashMap<String, List<String>>();
 		connectionTypes = new HashMap<String, List<StreamPartitioner<? extends Tuple>>>();
@@ -216,7 +218,7 @@ public class JobGraphBuilder {
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
-	public void addSink(String componentName, UserSinkInvokable<? extends Tuple> InvokableObject,
+	public void addSink(String componentName, SinkInvokable<? extends Tuple> InvokableObject,
 			String operatorName, byte[] serializedFunction, int parallelism) {
 
 		addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
@@ -286,6 +288,7 @@ public class JobGraphBuilder {
 
 		componentClasses.put(componentName, componentClass);
 		setParallelism(componentName, parallelism);
+		mutability.put(componentName, false);
 		invokableObjects.put(componentName, invokableObject);
 		operatorNames.put(componentName, operatorName);
 		serializedFunctions.put(componentName, serializedFunction);
@@ -333,6 +336,8 @@ public class JobGraphBuilder {
 
 		Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
 
+		config.setBoolean("isMutable", mutability.get(componentName));
+
 		// Set vertex config
 		if (invokableObject != null) {
 			config.setClass("userfunction", invokableObject.getClass());
@@ -423,6 +428,10 @@ public class JobGraphBuilder {
 		componentParallelism.put(componentName, parallelism);
 	}
 
+	public void setMutability(String componentName, boolean isMutable) {
+		mutability.put(componentName, isMutable);
+	}
+
 	/**
 	 * Connects two vertices in the JobGraph using the selected partitioner
 	 * settings
@@ -574,8 +583,8 @@ public class JobGraphBuilder {
 			}
 
 		} catch (JobGraphDefinitionException e) {
-			throw new RuntimeException("Cannot connect components: "
-					+ upStreamComponentName + " to " + downStreamComponentName, e);
+			throw new RuntimeException("Cannot connect components: " + upStreamComponentName
+					+ " to " + downStreamComponentName, e);
 		}
 
 		int outputIndex = upStreamComponent.getNumberOfForwardConnections() - 1;
@@ -638,7 +647,7 @@ public class JobGraphBuilder {
 	public <T extends Tuple> void setOutputSelector(String componentName,
 			byte[] serializedOutputSelector) {
 		outputSelectors.put(componentName, serializedOutputSelector);
-		
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Outputselector set for " + componentName);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index a14b150..c1eca4a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -121,6 +121,10 @@ public abstract class StreamExecutionEnvironment {
 		this.degreeOfParallelism = degreeOfParallelism;
 	}
 
+	protected void setMutability(DataStream<?> stream, boolean isMutable) {
+		jobGraphBuilder.setMutability(stream.getId(), isMutable);
+	}
+
 	/**
 	 * Sets the number of hardware contexts (CPU cores / threads) used when
 	 * executed in {@link LocalStreamEnvironment}.
@@ -226,7 +230,7 @@ public abstract class StreamExecutionEnvironment {
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize collection");
 		}
-		
+
 		return returnStream;
 	}
 
@@ -331,7 +335,7 @@ public abstract class StreamExecutionEnvironment {
 
 		jobGraphBuilder.addIterationSink(returnStream.getId(), inputStream.getId(), iterationID,
 				inputStream.getParallelism(), iterationName);
-		
+
 		jobGraphBuilder.setIterationSourceParallelism(iterationID, inputStream.getParallelism());
 
 		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
@@ -396,7 +400,7 @@ public abstract class StreamExecutionEnvironment {
 
 		return returnStream;
 	}
-	
+
 	/**
 	 * Writes a DataStream to the file specified by path in text format. The
 	 * writing is performed periodically, in every millis milliseconds. For
@@ -420,6 +424,7 @@ public abstract class StreamExecutionEnvironment {
 		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
 				format, millis, endTuple));
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
 
@@ -447,6 +452,7 @@ public abstract class StreamExecutionEnvironment {
 		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
 				format, batchSize, endTuple));
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
 
@@ -473,6 +479,7 @@ public abstract class StreamExecutionEnvironment {
 		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
 				format, millis, endTuple));
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
 
@@ -500,6 +507,7 @@ public abstract class StreamExecutionEnvironment {
 		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
 				format, batchSize, endTuple));
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
 
@@ -558,25 +566,25 @@ public abstract class StreamExecutionEnvironment {
 		jobGraphBuilder.setParallelism(inputStream.getId(), inputStream.degreeOfParallelism);
 	}
 
-//	/**
-//	 * Converts object to byte array using default java serialization
-//	 * 
-//	 * @param object
-//	 *            Object to be serialized
-//	 * @return Serialized object
-//	 */
-//	static byte[] serializeToByteArray(Serializable object) {
-//		SerializationUtils.serialize(object);
-//		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-//		ObjectOutputStream oos;
-//		try {
-//			oos = new ObjectOutputStream(baos);
-//			oos.writeObject(object);
-//		} catch (IOException e) {
-//			throw new RuntimeException("Cannot serialize object: " + object);
-//		}
-//		return baos.toByteArray();
-//	}
+	// /**
+	// * Converts object to byte array using default java serialization
+	// *
+	// * @param object
+	// * Object to be serialized
+	// * @return Serialized object
+	// */
+	// static byte[] serializeToByteArray(Serializable object) {
+	// SerializationUtils.serialize(object);
+	// ByteArrayOutputStream baos = new ByteArrayOutputStream();
+	// ObjectOutputStream oos;
+	// try {
+	// oos = new ObjectOutputStream(baos);
+	// oos.writeObject(object);
+	// } catch (IOException e) {
+	// throw new RuntimeException("Cannot serialize object: " + object);
+	// }
+	// return baos.toByteArray();
+	// }
 
 	// --------------------------------------------------------------------------------------------
 	// Instantiation of Execution Contexts

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSinkInvokable.java
deleted file mode 100644
index 09e766b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSinkInvokable.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public class DefaultSinkInvokable<T extends Tuple> extends UserSinkInvokable<T> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke() throws Exception {		
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSourceInvokable.java
deleted file mode 100644
index 0920ea0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSourceInvokable.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
-
-public class DefaultSourceInvokable extends UserSourceInvokable<Tuple> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke(Collector<Tuple> collector) throws Exception {
-		collector.collect(new Tuple1<String>("hello flink!"));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultTaskInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultTaskInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultTaskInvokable.java
deleted file mode 100644
index c86ffcb..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultTaskInvokable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public class DefaultTaskInvokable<T extends Tuple> extends UserTaskInvokable<T, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke() throws Exception {
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index c6e4e36..3c14490 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -22,7 +22,7 @@ package org.apache.flink.streaming.api.invokable;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
-public class SinkInvokable<IN extends Tuple> extends UserSinkInvokable<IN> {
+public class SinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
 	private SinkFunction<IN> sinkFunction;
@@ -33,8 +33,15 @@ public class SinkInvokable<IN extends Tuple> extends UserSinkInvokable<IN> {
 
 	@Override
 	public void invoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			sinkFunction.invoke((IN) reuse.getTuple());
-		}		
+		if (this.isMutable) {
+			while (recordIterator.next(reuse) != null) {
+				sinkFunction.invoke((IN) reuse.getTuple());
+			}
+		} else {
+			while (recordIterator.next(reuse) != null) {
+				sinkFunction.invoke((IN) reuse.getTuple());
+				resetReuse();
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 98eb679..903372b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -32,13 +32,21 @@ public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple>
 
 	protected Collector<OUT> collector;
 	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
+	StreamRecordSerializer<IN> serializer;
 	protected StreamRecord<IN> reuse;
+	protected boolean isMutable;
 
 	public void initialize(Collector<OUT> collector,
 			MutableObjectIterator<StreamRecord<IN>> recordIterator,
-			StreamRecordSerializer<IN> serializer) {
+			StreamRecordSerializer<IN> serializer, boolean isMutable) {
 		this.collector = collector;
 		this.recordIterator = recordIterator;
+		this.serializer = serializer;
+		this.reuse = serializer.createInstance();
+		this.isMutable = isMutable;
+	}
+
+	protected void resetReuse() {
 		this.reuse = serializer.createInstance();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSinkInvokable.java
deleted file mode 100644
index 78214e6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSinkInvokable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public abstract class UserSinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, IN>
-		implements Serializable {
-
-	
-	private static final long serialVersionUID = 1L;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 8e35af2..811a929 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -43,10 +43,18 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 	@Override
 	public void invoke() throws Exception {
 		MyIterator it = new MyIterator();
-		do {
-			reducer.reduce(it, collector);
-			it.reset();
-		} while (reuse != null);
+		if (this.isMutable) {
+			do {
+				reducer.reduce(it, collector);
+				it.reset();
+			} while (reuse != null);
+		} else {
+			do {
+				reducer.reduce(it, collector);
+				it.reset();
+			} while (reuse != null);
+		}
+
 	}
 
 	public class MyIterator implements Iterator<IN> {
@@ -62,6 +70,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 				return false;
 			} else {
 				try {
+					resetReuse();
 					reuse = recordIterator.next(reuse);
 				} catch (IOException e) {
 					e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 1dba6b8..ac79764 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -35,10 +35,20 @@ public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN>
 
 	@Override
 	public void invoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			if (filterFunction.filter(reuse.getTuple())) {
-				collector.collect(reuse.getTuple());
+		if (this.isMutable) {
+			while (recordIterator.next(reuse) != null) {
+				if (filterFunction.filter(reuse.getTuple())) {
+					collector.collect(reuse.getTuple());
+				}
+			}
+		} else {
+			while (recordIterator.next(reuse) != null) {
+				if (filterFunction.filter(reuse.getTuple())) {
+					collector.collect(reuse.getTuple());
+				}
+				resetReuse();
 			}
 		}
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 36ab1bf..33bda80 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -34,8 +34,15 @@ public class FlatMapInvokable<IN extends Tuple, OUT extends Tuple> extends
 	}
 
 	public void invoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			flatMapper.flatMap(reuse.getTuple(), collector);
+		if (this.isMutable) {
+			while (recordIterator.next(reuse) != null) {
+				flatMapper.flatMap(reuse.getTuple(), collector);
+			}
+		} else {
+			while (recordIterator.next(reuse) != null) {
+				flatMapper.flatMap(reuse.getTuple(), collector);
+				resetReuse();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 59b9f8f..ff29d15 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -34,8 +34,15 @@ public class MapInvokable<IN extends Tuple, OUT extends Tuple> extends UserTaskI
 
 	@Override
 	public void invoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			collector.collect(mapper.map(reuse.getTuple()));
+		if (this.isMutable) {
+			while (recordIterator.next(reuse) != null) {
+				collector.collect(mapper.map(reuse.getTuple()));
+			}
+		} else {
+			while (recordIterator.next(reuse) != null) {
+				collector.collect(mapper.map(reuse.getTuple()));
+				resetReuse();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 207d269..f90caf8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -76,6 +76,7 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
 	protected int instanceID;
 	protected String name;
 	private static int numComponents = 0;
+	protected boolean isMutable;
 
 	protected static int newComponent() {
 		numComponents++;
@@ -259,6 +260,7 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
 		StreamComponentInvokable userFunction = null;
 
 		byte[] userFunctionSerialized = configuration.getBytes("serializedudf", null);
+		this.isMutable = configuration.getBoolean("isMutable", false);
 
 		try {
 			userFunction = deserializeObject(userFunctionSerialized);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 508a10e9..1433ebc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -23,9 +23,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.streaming.api.invokable.DefaultSinkInvokable;
+import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-import org.apache.flink.streaming.api.invokable.UserSinkInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -62,10 +61,10 @@ public class StreamSink<IN extends Tuple> extends AbstractStreamComponent<IN, IN
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Override
 	protected void setInvokable() {
-		Class<? extends UserSinkInvokable> userFunctionClass = configuration.getClass(
-				"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
-		userFunction = (UserSinkInvokable<IN>) getInvokable(userFunctionClass);
-		userFunction.initialize(collector, inputIter, inTupleSerializer);
+		Class<? extends SinkInvokable> userFunctionClass = configuration.getClass("userfunction",
+				SinkInvokable.class, SinkInvokable.class);
+		userFunction = (SinkInvokable<IN>) getInvokable(userFunctionClass);
+		userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 4ac7100..108916a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.invokable.DefaultSourceInvokable;
 import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
@@ -74,7 +73,7 @@ public class StreamSource<OUT extends Tuple> extends AbstractStreamComponent<Tup
 	protected void setInvokable() {
 		// Default value is a TaskInvokable even if it was called from a source
 		Class<? extends UserSourceInvokable> userFunctionClass = configuration.getClass(
-				"userfunction", DefaultSourceInvokable.class, UserSourceInvokable.class);
+				"userfunction", UserSourceInvokable.class, UserSourceInvokable.class);
 		userFunction = (UserSourceInvokable<OUT>) getInvokable(userFunctionClass);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 44f93b3..7da19aa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -29,7 +29,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.invokable.DefaultTaskInvokable;
 import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -79,9 +78,9 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 	protected void setInvokable() {
 		// Default value is a TaskInvokable even if it was called from a source
 		Class<? extends UserTaskInvokable> userFunctionClass = configuration.getClass(
-				"userfunction", DefaultTaskInvokable.class, UserTaskInvokable.class);
+				"userfunction", UserTaskInvokable.class, UserTaskInvokable.class);
 		userFunction = (UserTaskInvokable<IN, OUT>) getInvokable(userFunctionClass);
-		userFunction.initialize(collector, inputIter, inTupleSerializer);
+		userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/StreamRecordTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/StreamRecordTest.java
deleted file mode 100755
index 403c863..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/StreamRecordTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.streamrecord;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.runtime.io.network.serialization.DataInputDeserializer;
-import org.apache.flink.runtime.io.network.serialization.DataOutputSerializer;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.junit.Test;
-
-public class StreamRecordTest {
-
-	@Test
-	public void testReadWrite() throws IOException {
-		StreamRecord<Tuple2<Integer, String>> streamRecord = new StreamRecord<Tuple2<Integer, String>>();
-		Tuple2<Integer, String> tuple = new Tuple2<Integer, String>(2, "a");
-		streamRecord.setTuple(tuple).setId(1);
-
-		TupleSerializer<Tuple2<Integer, String>> ts = (TupleSerializer<Tuple2<Integer, String>>) TypeExtractor
-				.getForObject(tuple).createSerializer();
-
-		SerializationDelegate<Tuple2<Integer, String>> sd = new SerializationDelegate<Tuple2<Integer, String>>(
-				ts);
-		streamRecord.setSeralizationDelegate(sd);
-
-		DataOutputSerializer out = new DataOutputSerializer(64);
-		streamRecord.write(out);
-
-		ByteBuffer buff = out.wrapAsByteBuffer();
-
-		DataInputDeserializer in = new DataInputDeserializer(buff);
-
-		StreamRecord<Tuple2<Integer, String>> streamRecord2 = new StreamRecord<Tuple2<Integer, String>>();
-
-		streamRecord2.setDeseralizationDelegate(
-				new DeserializationDelegate<Tuple2<Integer, String>>(ts), ts);
-
-		streamRecord2.read(in);
-
-		assertEquals(streamRecord.getTuple(), streamRecord2.getTuple());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
index 3a1c8f0..ac5240f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.streaming.partitioner;
 import static org.junit.Assert.assertArrayEquals;
 
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,6 +34,7 @@ public class BroadcastPartitionerTest {
 	private BroadcastPartitioner<Tuple> broadcastPartitioner3;
 	
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
 
 	@Before
 	public void setPartitioner() {
@@ -47,9 +49,9 @@ public class BroadcastPartitionerTest {
 		int[] first = new int[] { 0 };
 		int[] second = new int[] { 0, 1 };
 		int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
-
-		assertArrayEquals(first, broadcastPartitioner1.selectChannels(streamRecord, 1));
-		assertArrayEquals(second, broadcastPartitioner2.selectChannels(streamRecord, 2));
-		assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(streamRecord, 6));
+		sd.setInstance(streamRecord);
+		assertArrayEquals(first, broadcastPartitioner1.selectChannels(sd, 1));
+		assertArrayEquals(second, broadcastPartitioner2.selectChannels(sd, 2));
+		assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(sd, 6));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
index 77dacf7..716a869 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,8 +32,14 @@ import org.junit.Test;
 public class FieldsPartitionerTest {
 
 	private FieldsPartitioner<Tuple> fieldsPartitioner;
-	private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>().setTuple(new Tuple2<String, Integer>("test", 0));
-	private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>().setTuple(new Tuple2<String, Integer>("test", 42));
+	private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
+			.setTuple(new Tuple2<String, Integer>("test", 0));
+	private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
+			.setTuple(new Tuple2<String, Integer>("test", 42));
+	private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
+	private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
 
 	@Before
 	public void setPartitioner() {
@@ -41,22 +48,22 @@ public class FieldsPartitionerTest {
 
 	@Test
 	public void testSelectChannelsLength() {
-		assertEquals(1,
-				fieldsPartitioner.selectChannels(streamRecord1, 1).length);
-		assertEquals(1,
-				fieldsPartitioner.selectChannels(streamRecord1, 2).length);
-		assertEquals(1,
-				fieldsPartitioner.selectChannels(streamRecord1, 1024).length);
+		sd1.setInstance(streamRecord1);
+		assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1).length);
+		assertEquals(1, fieldsPartitioner.selectChannels(sd1, 2).length);
+		assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1024).length);
 	}
 
 	@Test
 	public void testSelectChannelsGrouping() {
-		assertArrayEquals(fieldsPartitioner.selectChannels(streamRecord1, 1),
-				fieldsPartitioner.selectChannels(streamRecord2, 1));
-		assertArrayEquals(fieldsPartitioner.selectChannels(streamRecord1, 2),
-				fieldsPartitioner.selectChannels(streamRecord2, 2));
-		assertArrayEquals(
-				fieldsPartitioner.selectChannels(streamRecord1, 1024),
-				fieldsPartitioner.selectChannels(streamRecord2, 1024));
+		sd1.setInstance(streamRecord1);
+		sd2.setInstance(streamRecord2);
+
+		assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1),
+				fieldsPartitioner.selectChannels(sd2, 1));
+		assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 2),
+				fieldsPartitioner.selectChannels(sd2, 2));
+		assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1024),
+				fieldsPartitioner.selectChannels(sd2, 1024));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
index dcf3160..e2703c1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.streaming.partitioner;
 import static org.junit.Assert.assertArrayEquals;
 
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
@@ -30,6 +31,8 @@ public class GlobalPartitionerTest {
 
 	private GlobalPartitioner<Tuple> globalPartitioner;
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
 
 	@Before
 	public void setPartitioner() {
@@ -40,11 +43,10 @@ public class GlobalPartitionerTest {
 	public void testSelectChannels() {
 		int[] result = new int[] { 0 };
 
-		assertArrayEquals(result,
-				globalPartitioner.selectChannels(streamRecord, 1));
-		assertArrayEquals(result,
-				globalPartitioner.selectChannels(streamRecord, 2));
-		assertArrayEquals(result,
-				globalPartitioner.selectChannels(streamRecord, 1024));
+		sd.setInstance(streamRecord);
+
+		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1));
+		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 2));
+		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1024));
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
index 32cc2ab..33881e8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,6 +32,8 @@ public class ShufflePartitionerTest {
 
 	private ShufflePartitioner<Tuple> shufflePartitioner;
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
 
 	@Before
 	public void setPartitioner() {
@@ -39,22 +42,21 @@ public class ShufflePartitionerTest {
 
 	@Test
 	public void testSelectChannelsLength() {
-		assertEquals(1,
-				shufflePartitioner.selectChannels(streamRecord, 1).length);
-		assertEquals(1,
-				shufflePartitioner.selectChannels(streamRecord, 2).length);
-		assertEquals(1,
-				shufflePartitioner.selectChannels(streamRecord, 1024).length);
+		sd.setInstance(streamRecord);
+		assertEquals(1, shufflePartitioner.selectChannels(sd, 1).length);
+		assertEquals(1, shufflePartitioner.selectChannels(sd, 2).length);
+		assertEquals(1, shufflePartitioner.selectChannels(sd, 1024).length);
 	}
 
 	@Test
 	public void testSelectChannelsInterval() {
-		assertEquals(0, shufflePartitioner.selectChannels(streamRecord, 1)[0]);
+		sd.setInstance(streamRecord);
+		assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]);
 
-		assertTrue(0 <= shufflePartitioner.selectChannels(streamRecord, 2)[0]);
-		assertTrue(2 > shufflePartitioner.selectChannels(streamRecord, 2)[0]);
+		assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]);
+		assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]);
 
-		assertTrue(0 <= shufflePartitioner.selectChannels(streamRecord, 1024)[0]);
-		assertTrue(1024 > shufflePartitioner.selectChannels(streamRecord, 1024)[0]);
+		assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]);
+		assertTrue(1024 > shufflePartitioner.selectChannels(sd, 1024)[0]);
 	}
 }