You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 00:15:47 UTC

[5/8] flink git commit: [hotfix] Fix generics for stream record and watermark multiplexing.

[hotfix] Fix generics for stream record and watermark multiplexing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acae9ff2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acae9ff2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acae9ff2

Branch: refs/heads/master
Commit: acae9ff2583384dada84b40a89d3a068e3b2a00c
Parents: 8ba3213
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 18:07:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200

----------------------------------------------------------------------
 .../runtime/plugable/SerializationDelegate.java |  7 +-
 .../runtime/io/RecordWriterOutput.java          | 21 ++---
 .../runtime/io/StreamInputProcessor.java        | 25 +++---
 .../runtime/io/StreamRecordWriter.java          |  2 +-
 .../runtime/io/StreamTwoInputProcessor.java     | 57 +++++++------
 .../MultiplexingStreamRecordSerializer.java     | 35 ++++++--
 .../runtime/streamrecord/StreamRecord.java      | 16 ++--
 .../streamrecord/StreamRecordSerializer.java    | 65 ++++++++-------
 .../streaming/runtime/tasks/OutputHandler.java  | 85 +++++++++++---------
 .../runtime/tasks/StreamTaskTestHarness.java    |  7 +-
 10 files changed, 185 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
index 3cbaac3..91b6dd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
@@ -26,7 +26,12 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
+/**
+ * The serialization delegate exposes an arbitrary element as a {@link IOReadableWritable} for
+ * serialization, with the help of a type serializer.
+ * 
+ * @param <T> The type to be represented as an IOReadableWritable.
+ */
 public class SerializationDelegate<T> implements IOReadableWritable {
 	
 	private T instance;

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index f7d8d47..de8c205 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -39,32 +39,33 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);
 
-	private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
-	private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
+	private RecordWriter<SerializationDelegate<Object>> recordWriter;
+	
+	private SerializationDelegate<Object> serializationDelegate;
 
 	@SuppressWarnings("unchecked")
 	public RecordWriterOutput(
-			RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
+			RecordWriter<?> recordWriter,
 			TypeSerializer<OUT> outSerializer,
 			boolean enableWatermarkMultiplexing) {
+		
 		Preconditions.checkNotNull(recordWriter);
 
-		this.recordWriter = recordWriter;
+		this.recordWriter = (RecordWriter<SerializationDelegate<Object>>) recordWriter;
 
-		StreamRecordSerializer<OUT> outRecordSerializer;
+		TypeSerializer<Object> outRecordSerializer;
 		if (enableWatermarkMultiplexing) {
 			outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
 		} else {
-			outRecordSerializer = new StreamRecordSerializer<OUT>(outSerializer);
+			outRecordSerializer = (TypeSerializer<Object>) (TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
 		}
 
 		if (outSerializer != null) {
-			serializationDelegate = new SerializationDelegate(outRecordSerializer);
+			serializationDelegate = new SerializationDelegate<Object>(outRecordSerializer);
 		}
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public void collect(StreamRecord<OUT> record) {
 		serializationDelegate.setInstance(record);
 
@@ -79,9 +80,9 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked,rawtypes")
 	public void emitWatermark(Watermark mark) {
-		((SerializationDelegate)serializationDelegate).setInstance(mark);
+		serializationDelegate.setInstance(mark);
+		
 		try {
 			recordWriter.broadcastEmit(serializationDelegate);
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 4c40e5f..9db0178 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -46,9 +46,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
  *
- * <p>
- * This also keeps track of {@link Watermark} events and forwards them to event subscribers
- * once the {@link Watermark} from all inputs advances.
+ * <p>This also keeps track of {@link Watermark} events and forwards them to event subscribers
+ * once the {@link Watermark} from all inputs advances.</p>
  * 
  * @param <IN> The type of the record that can be read with this record reader.
  */
@@ -63,33 +62,35 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 
 	// We need to keep track of the channel from which a buffer came, so that we can
 	// appropriately map the watermarks to input channels
-	int currentChannel = -1;
+	private int currentChannel = -1;
 
 	private boolean isFinished;
 
 	private final BarrierBuffer barrierBuffer;
 
-	private long[] watermarks;
+	private final long[] watermarks;
 	private long lastEmittedWatermark;
 
-	private DeserializationDelegate<Object> deserializationDelegate;
+	private final DeserializationDelegate<Object> deserializationDelegate;
 
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
 		super(InputGateUtil.createInputGate(inputGates));
 
 		barrierBuffer = new BarrierBuffer(inputGate, this);
-
-		StreamRecordSerializer<IN> inputRecordSerializer;
+		
 		if (enableWatermarkMultiplexing) {
-			inputRecordSerializer = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+			MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+			this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(ser);
 		} else {
-			inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer);
+			StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
+			this.deserializationDelegate = (NonReusingDeserializationDelegate<Object>)
+					(NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
 		}
-		this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(inputRecordSerializer);
-
+		
 		// Initialize one deserializer per input channel
 		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+		
 		for (int i = 0; i < recordDeserializers.length; i++) {
 			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index b0e2532..321f3b4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -58,7 +58,7 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 		
 		super(writer, channelSelector);
 		
-		checkArgument(timeout < 0);
+		checkArgument(timeout >= 0);
 		
 		if (timeout == 0) {
 			flushAlways = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 82e7936..e235ffe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -64,64 +64,70 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 
 	// We need to keep track of the channel from which a buffer came, so that we can
 	// appropriately map the watermarks to input channels
-	int currentChannel = -1;
+	private int currentChannel = -1;
 
 	private boolean isFinished;
 
 	private final BarrierBuffer barrierBuffer;
 
-	private long[] watermarks1;
+	private final long[] watermarks1;
 	private long lastEmittedWatermark1;
 
-	private long[] watermarks2;
+	private final long[] watermarks2;
 	private long lastEmittedWatermark2;
 
-	private int numInputChannels1;
-	private int numInputChannels2;
+	private final int numInputChannels1;
 
-	private DeserializationDelegate<Object> deserializationDelegate1;
-	private DeserializationDelegate<Object> deserializationDelegate2;
+	private final DeserializationDelegate<Object> deserializationDelegate1;
+	private final DeserializationDelegate<Object> deserializationDelegate2;
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	public StreamTwoInputProcessor(
 			Collection<InputGate> inputGates1,
 			Collection<InputGate> inputGates2,
 			TypeSerializer<IN1> inputSerializer1,
 			TypeSerializer<IN2> inputSerializer2,
 			boolean enableWatermarkMultiplexing) {
+		
 		super(InputGateUtil.createInputGate(inputGates1, inputGates2));
 
 		barrierBuffer = new BarrierBuffer(inputGate, this);
-
-		StreamRecordSerializer<IN1> inputRecordSerializer1;
+		
 		if (enableWatermarkMultiplexing) {
-			inputRecordSerializer1 = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
-		} else {
-			inputRecordSerializer1 = new StreamRecordSerializer<IN1>(inputSerializer1);
+			MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
+			this.deserializationDelegate1 = new NonReusingDeserializationDelegate<Object>(ser);
 		}
-		this.deserializationDelegate1 = new NonReusingDeserializationDelegate(inputRecordSerializer1);
-
-		StreamRecordSerializer<IN2> inputRecordSerializer2;
+		else {
+			StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
+			this.deserializationDelegate1 = (DeserializationDelegate<Object>)
+					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
+		}
+		
 		if (enableWatermarkMultiplexing) {
-			inputRecordSerializer2 = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
-		} else {
-			inputRecordSerializer2 = new StreamRecordSerializer<IN2>(inputSerializer2);
+			MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
+			this.deserializationDelegate2 = new NonReusingDeserializationDelegate<Object>(ser);
+		}
+		else {
+			StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
+			this.deserializationDelegate2 = (DeserializationDelegate<Object>)
+					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
 		}
-		this.deserializationDelegate2 = new NonReusingDeserializationDelegate(inputRecordSerializer2);
 
 		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
-				.getNumberOfInputChannels()];
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+		
 		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer();
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
 		}
 
 		// determine which unioned channels belong to input 1 and which belong to input 2
-		numInputChannels1 = 0;
+		int numInputChannels1 = 0;
 		for (InputGate gate: inputGates1) {
 			numInputChannels1 += gate.getNumberOfInputChannels();
 		}
-		numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
+		
+		this.numInputChannels1 = numInputChannels1;
+		int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
 
 		watermarks1 = new long[numInputChannels1];
 		for (int i = 0; i < numInputChannels1; i++) {
@@ -262,6 +268,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 		}
 	}
 
+	@Override
 	public void cleanup() throws IOException {
 		barrierBuffer.cleanup();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 715f0d2..075c4fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.streamrecord;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -35,17 +36,36 @@ import java.io.IOException;
  *
  * @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
  */
-public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSerializer<T> {
-
-	private final long IS_WATERMARK = Long.MIN_VALUE;
+public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Object> {
 
 	private static final long serialVersionUID = 1L;
 
+	private static final long IS_WATERMARK = Long.MIN_VALUE;
+	
+	protected final TypeSerializer<T> typeSerializer;
+
+	
 	public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
-		super(serializer);
-		if (serializer instanceof MultiplexingStreamRecordSerializer) {
+		if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
 			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
 		}
+		this.typeSerializer = Preconditions.checkNotNull(serializer);
+	}
+	
+	
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<Object> duplicate() {
+		return this;
+	}
+
+	@Override
+	public Object createInstance() {
+		return new StreamRecord<T>(typeSerializer.createInstance(), 0L);
 	}
 
 	@Override
@@ -81,6 +101,11 @@ public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSer
 	}
 
 	@Override
+	public int getLength() {
+		return 0;
+	}
+
+	@Override
 	@SuppressWarnings("unchecked")
 	public void serialize(Object value, DataOutputView target) throws IOException {
 		if (value instanceof StreamRecord) {

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 6521e7f..92ce66f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -19,12 +19,13 @@ package org.apache.flink.streaming.runtime.streamrecord;
 
 /**
  * One value in a data stream. This stores the value and the associated timestamp.
+ * 
+ * @param <T> The type encapsulated with the stream record.
  */
 public class StreamRecord<T> {
-
-	// We store it as Object so that we can reuse a StreamElement for emitting
-	// elements of a different type while still reusing the timestamp.
-	private Object value;
+	
+	private T value;
+	
 	private long timestamp;
 
 	/**
@@ -52,9 +53,8 @@ public class StreamRecord<T> {
 	/**
 	 * Returns the value wrapped in this stream value.
 	 */
-	@SuppressWarnings("unchecked")
 	public T getValue() {
-		return (T) value;
+		return value;
 	}
 
 	/**
@@ -74,7 +74,7 @@ public class StreamRecord<T> {
 	 */
 	@SuppressWarnings("unchecked")
 	public <X> StreamRecord<X> replace(X element) {
-		this.value = element;
+		this.value = (T) element;
 		return (StreamRecord<X>) this;
 	}
 
@@ -90,7 +90,7 @@ public class StreamRecord<T> {
 	@SuppressWarnings("unchecked")
 	public <X> StreamRecord<X> replace(X value, long timestamp) {
 		this.timestamp = timestamp;
-		this.value = value;
+		this.value = (T) value;
 		return (StreamRecord<X>) this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 2619891..e58d3c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -38,11 +38,12 @@ import org.apache.flink.core.memory.DataOutputView;
  *
  * @param <T> The type of value in the {@link StreamRecord}
  */
-public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
+public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
 
 	private static final long serialVersionUID = 1L;
 
-	protected final TypeSerializer<T> typeSerializer;
+	private final TypeSerializer<T> typeSerializer;
+	
 
 	public StreamRecordSerializer(TypeSerializer<T> serializer) {
 		if (serializer instanceof StreamRecordSerializer) {
@@ -51,19 +52,36 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
 		this.typeSerializer = Preconditions.checkNotNull(serializer);
 	}
 
+	public TypeSerializer<T> getContainedTypeSerializer() {
+		return this.typeSerializer;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  General serializer and type utils
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamRecordSerializer<T> duplicate() {
+		TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
+		return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
+	}
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public StreamRecordSerializer<T> duplicate() {
-		return this;
+	public int getLength() {
+		return typeSerializer.getLength();
 	}
 
+	// ------------------------------------------------------------------------
+	//  Type serialization, copying, instantiation
+	// ------------------------------------------------------------------------
+
 	@Override
-	public Object createInstance() {
+	public StreamRecord<T> createInstance() {
 		try {
 			return new StreamRecord<T>(typeSerializer.createInstance());
 		} catch (Exception e) {
@@ -72,46 +90,31 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
 	}
 	
 	@Override
-	@SuppressWarnings("unchecked")
-	public Object copy(Object from) {
-		StreamRecord<T> fromRecord = (StreamRecord<T>) from;
-		return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
+	public StreamRecord<T> copy(StreamRecord<T> from) {
+		return new StreamRecord<T>(typeSerializer.copy(from.getValue()), from.getTimestamp());
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public Object copy(Object from, Object reuse) {
-		StreamRecord<T> fromRecord = (StreamRecord<T>) from;
-		StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
-
-		reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), 0);
+	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
+		reuse.replace(typeSerializer.copy(from.getValue(), reuse.getValue()), 0);
 		return reuse;
 	}
 
 	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void serialize(Object value, DataOutputView target) throws IOException {
-		StreamRecord<T> record = (StreamRecord<T>) value;
-		typeSerializer.serialize(record.getValue(), target);
+	public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
+		typeSerializer.serialize(value.getValue(), target);
 	}
 	
 	@Override
-	public Object deserialize(DataInputView source) throws IOException {
+	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
 		T element = typeSerializer.deserialize(source);
 		return new StreamRecord<T>(element, 0);
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public Object deserialize(Object reuse, DataInputView source) throws IOException {
-		StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
-		T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
-		reuseRecord.replace(element, 0);
+	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
+		T element = typeSerializer.deserialize(reuse.getValue(), source);
+		reuse.replace(element, 0);
 		return reuse;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index aa55151..84614bf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -44,28 +44,30 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class OutputHandler<OUT> {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
 
-	private StreamTask<OUT, ?> vertex;
-	private StreamConfig configuration;
-	private ClassLoader cl;
-	private Output<StreamRecord<OUT>> outerOutput;
+	private final StreamTask<OUT, ?> vertex;
+	
+	/** The classloader used to access all user code */
+	private final ClassLoader userCodeClassloader;
+	
+	
+	private final Output<StreamRecord<OUT>> outerOutput;
 
-	public List<StreamOperator<?>> chainedOperators;
+	public final List<StreamOperator<?>> chainedOperators;
 
-	private Map<StreamEdge, RecordWriterOutput<?>> outputMap;
+	private final Map<StreamEdge, RecordWriterOutput<?>> outputMap;
 
-	private Map<Integer, StreamConfig> chainedConfigs;
-	private List<StreamEdge> outEdgesInOrder;
+	private final Map<Integer, StreamConfig> chainedConfigs;
 
-	/**
-	 * Counters for the number of records emitted and bytes written.
-	 */
-	protected AccumulatorRegistry.Reporter reporter;
+	/** Counters for the number of records emitted and bytes written. */
+	protected final AccumulatorRegistry.Reporter reporter;
 
 
 	public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, Accumulator<?,?>> accumulatorMap,
@@ -73,17 +75,17 @@ public class OutputHandler<OUT> {
 
 		// Initialize some fields
 		this.vertex = vertex;
-		this.configuration = new StreamConfig(vertex.getTaskConfiguration());
+		StreamConfig configuration = new StreamConfig(vertex.getTaskConfiguration());
 		this.chainedOperators = new ArrayList<StreamOperator<?>>();
 		this.outputMap = new HashMap<StreamEdge, RecordWriterOutput<?>>();
-		this.cl = vertex.getUserCodeClassLoader();
+		this.userCodeClassloader = vertex.getUserCodeClassLoader();
 
 		// We read the chained configs, and the order of record writer
-		// registrations by outputname
-		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
+		// registrations by output name
+		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
 		this.chainedConfigs.put(configuration.getVertexID(), configuration);
 
-		this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
+		List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
 
 		this.reporter = reporter;
 
@@ -133,25 +135,24 @@ public class OutputHandler<OUT> {
 	 * @return Returns the output for the chain starting from the given
 	 * config
 	 */
-	@SuppressWarnings({"unchecked", "rawtypes"})
+	@SuppressWarnings("unchecked")
 	private <X> Output<StreamRecord<X>> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
-
-
+		
 		// We create a wrapper that will encapsulate the chained operators and
 		// network outputs
 
-		OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl);
+		OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(userCodeClassloader);
 		CollectorWrapper wrapper = new CollectorWrapper(outputSelectorWrapper);
 
 		// Create collectors for the network outputs
-		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
+		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(userCodeClassloader)) {
 			Output<?> output = outputMap.get(outputEdge);
 
 			wrapper.addCollector(output, outputEdge);
 		}
 
 		// Create collectors for the chained outputs
-		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
+		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(userCodeClassloader)) {
 			Integer outputId = outputEdge.getTargetId();
 
 			Output<?> output = createChainedCollector(chainedConfigs.get(outputId), accumulatorMap);
@@ -163,11 +164,12 @@ public class OutputHandler<OUT> {
 			// The current task is the first chained task at this vertex so we
 			// return the wrapper
 			return (Output<StreamRecord<X>>) wrapper;
-		} else {
+		}
+		else {
 			// The current task is a part of the chain so we get the chainable
 			// operator which will be returned and set it up using the wrapper
 			OneInputStreamOperator chainableOperator =
-					chainedTaskConfig.getStreamOperator(vertex.getUserCodeClassLoader());
+					chainedTaskConfig.getStreamOperator(userCodeClassloader);
 
 			StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap);
 			vertex.contexts.add(chainedContext);
@@ -177,14 +179,20 @@ public class OutputHandler<OUT> {
 			chainedOperators.add(chainableOperator);
 			if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
 				return new ChainingOutput<X>(chainableOperator);
-			} else {
-				StreamRecordSerializer serializerIn1;
+			}
+			else {
+				TypeSerializer<X> typeSer = chainedTaskConfig.getTypeSerializerIn1(userCodeClassloader);
+				TypeSerializer<StreamRecord<X>> inSerializer;
+				
 				if (vertex.getExecutionConfig().areTimestampsEnabled()) {
-					serializerIn1 = new MultiplexingStreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
-				} else {
-					serializerIn1 = new StreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
+					inSerializer = (TypeSerializer<StreamRecord<X>>) 
+							(TypeSerializer<?>) new MultiplexingStreamRecordSerializer<X>(typeSer);
 				}
-				return new CopyingChainingOutput<X>(chainableOperator, (TypeSerializer<StreamRecord<X>>) serializerIn1);
+				else {
+					inSerializer = new StreamRecordSerializer<X>(typeSer);
+				}
+				
+				return new CopyingChainingOutput<X>(chainableOperator, inSerializer);
 			}
 		}
 
@@ -244,14 +252,14 @@ public class OutputHandler<OUT> {
 	}
 
 	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
-		protected OneInputStreamOperator<T, ?> operator;
+		
+		protected final OneInputStreamOperator<T, ?> operator;
 
 		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
 			this.operator = operator;
 		}
 
 		@Override
-		@SuppressWarnings("unchecked")
 		public void collect(StreamRecord<T> record) {
 			try {
 				operator.getRuntimeContext().setNextInput(record);
@@ -268,7 +276,8 @@ public class OutputHandler<OUT> {
 		public void emitWatermark(Watermark mark) {
 			try {
 				operator.processWatermark(mark);
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
 					LOG.error("Could not forward element to operator: {}", e);
 				}
@@ -280,10 +289,12 @@ public class OutputHandler<OUT> {
 		public void close() {
 			try {
 				operator.close();
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
 					LOG.error("Could not forward close call to operator.", e);
 				}
+				throw new RuntimeException(e);
 			}
 		}
 	}
@@ -298,12 +309,12 @@ public class OutputHandler<OUT> {
 		}
 
 		@Override
-		@SuppressWarnings("unchecked")
 		public void collect(StreamRecord<T> record) {
 			try {
 				operator.getRuntimeContext().setNextInput(record);
 				operator.processElement(serializer.copy(record));
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
 					LOG.error("Could not forward element to operator.", e);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 283243e..435831f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 
@@ -79,9 +78,8 @@ public class StreamTaskTestHarness<OUT> {
 
 	private AbstractInvokable task;
 
-	private TypeInformation<OUT> outputType;
 	private TypeSerializer<OUT> outputSerializer;
-	private StreamRecordSerializer<OUT> outputStreamRecordSerializer;
+	private TypeSerializer<Object> outputStreamRecordSerializer;
 
 	private ConcurrentLinkedQueue<Object> outputList;
 
@@ -114,7 +112,6 @@ public class StreamTaskTestHarness<OUT> {
 		streamConfig.setChainStart();
 		streamConfig.setBufferTimeout(0);
 
-		this.outputType = outputType;
 		outputSerializer = outputType.createSerializer(executionConfig);
 		outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
 	}
@@ -127,7 +124,7 @@ public class StreamTaskTestHarness<OUT> {
 
 	@SuppressWarnings("unchecked")
 	private void initializeOutput() {
-		outputList = new ConcurrentLinkedQueue();
+		outputList = new ConcurrentLinkedQueue<Object>();
 
 		mockEnv.addOutput(outputList, outputStreamRecordSerializer);