You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/24 20:39:52 UTC

flink git commit: [hotfix] Fix warnings introduced by recent Watermark Commit

Repository: flink
Updated Branches:
  refs/heads/master efca79cfb -> 31c0c2925


[hotfix] Fix warnings introduced by recent Watermark Commit


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31c0c292
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31c0c292
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31c0c292

Branch: refs/heads/master
Commit: 31c0c2925c63c2999aa831602f52601d33cd6b47
Parents: efca79c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Jul 24 11:29:12 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jul 24 17:44:35 2015 +0200

----------------------------------------------------------------------
 .../BroadcastOutputSelectorWrapper.java         |  2 +-
 .../selector/DirectedOutputSelectorWrapper.java |  2 +-
 .../streaming/api/operators/StreamFlatMap.java  |  4 ++--
 .../operators/windowing/StreamDiscretizer.java  |  6 +++---
 .../streaming/runtime/io/CollectorWrapper.java  |  3 ++-
 .../runtime/io/RecordWriterOutput.java          | 12 +++++------
 .../runtime/io/StreamInputProcessor.java        | 10 +++++-----
 .../runtime/io/StreamTwoInputProcessor.java     |  8 ++++----
 .../runtime/streamrecord/StreamRecord.java      |  2 +-
 .../streamrecord/StreamRecordSerializer.java    |  2 +-
 .../streaming/runtime/tasks/OutputHandler.java  | 21 ++++++++++----------
 .../streaming/util/keys/KeySelectorUtil.java    |  1 +
 .../consumer/StreamTestSingleInputGate.java     |  5 +++--
 .../api/operators/StreamCounterTest.java        |  3 +--
 .../api/operators/StreamFilterTest.java         |  2 +-
 .../api/operators/StreamFlatMapTest.java        |  3 +--
 .../api/operators/StreamGroupedFoldTest.java    |  2 +-
 .../api/operators/StreamGroupedReduceTest.java  |  2 +-
 .../streaming/api/operators/StreamMapTest.java  |  2 +-
 .../api/operators/StreamProjectTest.java        | 17 +++++++++-------
 .../api/operators/co/CoStreamFlatMapTest.java   |  3 +--
 .../api/operators/co/CoStreamMapTest.java       | 11 +---------
 .../runtime/tasks/OneInputStreamTaskTest.java   | 11 ++++------
 .../runtime/tasks/SourceStreamTaskTest.java     |  5 +++--
 .../runtime/tasks/StreamTaskTestHarness.java    |  7 +++----
 .../runtime/tasks/TwoInputStreamTaskTest.java   |  8 ++++----
 .../streaming/timestamp/TimestampITCase.java    |  1 +
 .../util/OneInputStreamOperatorTestHarness.java |  7 +++----
 .../flink/streaming/util/TestHarnessUtil.java   |  8 ++++----
 .../util/TwoInputStreamOperatorTestHarness.java |  6 +++---
 30 files changed, 83 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
index 0fe84d8..00c6f80 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
@@ -33,7 +33,7 @@ public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrappe
 		outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
 	}
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings("unchecked,rawtypes")
 	@Override
 	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
 		Collector output1 = output;

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
index 46b315d..c6e3388 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
@@ -47,7 +47,7 @@ public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper
 		this.outputMap = new HashMap<String, List<Collector<StreamRecord<OUT>>>>();
 	}
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings("unchecked,rawtypes")
 	@Override
 	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
 		Collector output1 = output;

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index 5547c6a..ff7f662 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -28,7 +28,7 @@ public class StreamFlatMap<IN, OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	private TimestampedCollector<OUT> collector;
+	private transient TimestampedCollector<OUT> collector;
 
 	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
 		super(flatMapper);
@@ -38,7 +38,7 @@ public class StreamFlatMap<IN, OUT>
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
-		collector = new TimestampedCollector(output);
+		collector = new TimestampedCollector<OUT>(output);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
index df84b62..47c2323 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
@@ -132,7 +132,7 @@ public class StreamDiscretizer<IN>
 	 * if not empty
 	 */
 	protected void emitWindow() {
-		output.collect(new StreamRecord(windowEvent.setTrigger()));
+		output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setTrigger()));
 	}
 
 	private void activeEvict(Object input) {
@@ -144,7 +144,7 @@ public class StreamDiscretizer<IN>
 		}
 
 		if (numToEvict > 0) {
-			output.collect(new StreamRecord(windowEvent.setEviction(numToEvict)));
+			output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
 			bufferSize -= numToEvict;
 			bufferSize = bufferSize >= 0 ? bufferSize : 0;
 		}
@@ -154,7 +154,7 @@ public class StreamDiscretizer<IN>
 		int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, bufferSize);
 
 		if (numToEvict > 0) {
-			output.collect(new StreamRecord(windowEvent.setEviction(numToEvict)));
+			output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
 			bufferSize -= numToEvict;
 			bufferSize = bufferSize >= 0 ? bufferSize : 0;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
index 2f9d1d6..6bb44dd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
@@ -38,7 +38,8 @@ public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
 		allOutputs = new ArrayList<Output<OUT>>();
 	}
 
-	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
+	@SuppressWarnings("unchecked,rawtypes")
+	public void addCollector(Output<StreamRecord<?>> output, StreamEdge edge) {
 		outputSelectorWrapper.addCollector(output, edge);
 		allOutputs.add((Output) output);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index e9cbb7d..b656bb5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -39,12 +39,12 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);
 
-	private RecordWriter<SerializationDelegate> recordWriter;
-	private SerializationDelegate serializationDelegate;
+	private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
+	private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
 
 	@SuppressWarnings("unchecked")
 	public RecordWriterOutput(
-			RecordWriter<SerializationDelegate> recordWriter,
+			RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
 			TypeSerializer<OUT> outSerializer,
 			boolean enableWatermarkMultiplexing) {
 		Preconditions.checkNotNull(recordWriter);
@@ -79,9 +79,9 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings("unchecked,rawtypes")
 	public void emitWatermark(Watermark mark) {
-		serializationDelegate.setInstance(mark);
+		((SerializationDelegate)serializationDelegate).setInstance(mark);
 		try {
 			recordWriter.broadcastEmit(serializationDelegate);
 		} catch (Exception e) {
@@ -95,7 +95,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	@Override
 	public void close() {
 		if (recordWriter instanceof StreamRecordWriter) {
-			((StreamRecordWriter) recordWriter).close();
+			((StreamRecordWriter<?>) recordWriter).close();
 		} else {
 			try {
 				recordWriter.flush();

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index e665710..4c40e5f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -57,9 +57,9 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 	@SuppressWarnings("unused")
 	private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
 
-	private final RecordDeserializer<DeserializationDelegate>[] recordDeserializers;
+	private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;
 
-	private RecordDeserializer<DeserializationDelegate> currentRecordDeserializer;
+	private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;
 
 	// We need to keep track of the channel from which a buffer came, so that we can
 	// appropriately map the watermarks to input channels
@@ -72,7 +72,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 	private long[] watermarks;
 	private long lastEmittedWatermark;
 
-	private DeserializationDelegate deserializationDelegate;
+	private DeserializationDelegate<Object> deserializationDelegate;
 
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
@@ -86,12 +86,12 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 		} else {
 			inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer);
 		}
-		this.deserializationDelegate = new NonReusingDeserializationDelegate(inputRecordSerializer);
+		this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(inputRecordSerializer);
 
 		// Initialize one deserializer per input channel
 		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
 		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate>();
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
 		}
 
 		watermarks = new long[inputGate.getNumberOfInputChannels()];

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 1fe98bb..82e7936 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -58,9 +58,9 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 	@SuppressWarnings("unused")
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
 
-	private final RecordDeserializer[] recordDeserializers;
+	private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;
 
-	private RecordDeserializer currentRecordDeserializer;
+	private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;
 
 	// We need to keep track of the channel from which a buffer came, so that we can
 	// appropriately map the watermarks to input channels
@@ -79,8 +79,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 	private int numInputChannels1;
 	private int numInputChannels2;
 
-	private DeserializationDelegate deserializationDelegate1;
-	private DeserializationDelegate deserializationDelegate2;
+	private DeserializationDelegate<Object> deserializationDelegate1;
+	private DeserializationDelegate<Object> deserializationDelegate2;
 
 	@SuppressWarnings("unchecked")
 	public StreamTwoInputProcessor(

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index aff030e..6521e7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -103,7 +103,7 @@ public class StreamRecord<T> {
 			return false;
 		}
 
-		StreamRecord that = (StreamRecord) o;
+		StreamRecord<?> that = (StreamRecord<?>) o;
 
 		return value.equals(that.value) && timestamp == that.timestamp;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index b05eb36..2619891 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -58,7 +58,7 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public TypeSerializer duplicate() {
+	public StreamRecordSerializer<T> duplicate() {
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index cf17b3e..aa55151 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -44,7 +44,6 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -146,18 +145,18 @@ public class OutputHandler<OUT> {
 
 		// Create collectors for the network outputs
 		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
-			Collector<?> outCollector = outputMap.get(outputEdge);
+			Output<?> output = outputMap.get(outputEdge);
 
-			wrapper.addCollector(outCollector, outputEdge);
+			wrapper.addCollector(output, outputEdge);
 		}
 
 		// Create collectors for the chained outputs
 		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
-			Integer output = outputEdge.getTargetId();
+			Integer outputId = outputEdge.getTargetId();
 
-			Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output), accumulatorMap);
+			Output<?> output = createChainedCollector(chainedConfigs.get(outputId), accumulatorMap);
 
-			wrapper.addCollector(outCollector, outputEdge);
+			wrapper.addCollector(output, outputEdge);
 		}
 
 		if (chainedTaskConfig.isChainStart()) {
@@ -200,7 +199,7 @@ public class OutputHandler<OUT> {
 	 * the configuration of its source task
 	 *
 	 * @param outputVertex
-	 * 		Name of the output to which the streamoutput will be set up
+	 * 		Name of the output to which the stream output will be set up
 	 * @param upStreamConfig
 	 * 		The config of upStream task
 	 * @return The created StreamOutput
@@ -222,7 +221,7 @@ public class OutputHandler<OUT> {
 		output.setReporter(reporter);
 
 		@SuppressWarnings("unchecked")
-		RecordWriterOutput<T> streamOutput = new RecordWriterOutput<T>((RecordWriter) output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
+		RecordWriterOutput<T> streamOutput = new RecordWriterOutput<T>(output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
 
 		if (LOG.isTraceEnabled()) {
 			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
@@ -245,9 +244,9 @@ public class OutputHandler<OUT> {
 	}
 
 	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
-		protected OneInputStreamOperator operator;
+		protected OneInputStreamOperator<T, ?> operator;
 
-		public ChainingOutput(OneInputStreamOperator<?, T> operator) {
+		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
 			this.operator = operator;
 		}
 
@@ -292,7 +291,7 @@ public class OutputHandler<OUT> {
 	private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
 		private final TypeSerializer<StreamRecord<T>> serializer;
 
-		public CopyingChainingOutput(OneInputStreamOperator<?, T> operator,
+		public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator,
 				TypeSerializer<StreamRecord<T>> serializer) {
 			super(operator);
 			this.serializer = serializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 49f2fe0..89c6142 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -103,6 +103,7 @@ public class KeySelectorUtil {
 		}
 
 		@Override
+		@SuppressWarnings("unchecked")
 		public K getKey(IN value) throws Exception {
 			comparator.extractKeys(value, keyArray, 0);
 			key = (K) keyArray[0];

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index a20436a..c479f95 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -62,6 +62,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 
 	private ConcurrentLinkedQueue<InputValue<Object>>[] inputQueues;
 
+	@SuppressWarnings("unchecked")
 	public StreamTestSingleInputGate(
 			int numInputChannels,
 			int bufferSize,
@@ -84,8 +85,8 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 
 		for (int i = 0; i < numInputChannels; i++) {
 			final int channelIndex = i;
-			final RecordSerializer<SerializationDelegate<StreamRecord<T>>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<StreamRecord<T>>>();
-			final SerializationDelegate delegate = new SerializationDelegate(new MultiplexingStreamRecordSerializer<T>(serializer));
+			final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
+			final SerializationDelegate<Object> delegate = new SerializationDelegate(new MultiplexingStreamRecordSerializer<T>(serializer));
 
 			inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
 			inputChannels[channelIndex] = new TestInputChannel(inputGate, i);

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
index 3e662ba..dc8024c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
 import org.junit.Test;
 
 /**
@@ -43,7 +42,7 @@ public class StreamCounterTest {
 		OneInputStreamOperatorTestHarness<String, Long> testHarness = new OneInputStreamOperatorTestHarness<String, Long>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
index f672a89..bf4fe40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
@@ -58,7 +58,7 @@ public class StreamFilterTest {
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
index ac7caa7..e4e29c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -62,7 +61,7 @@ public class StreamFlatMapTest {
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index 8499aa2..cb08e65 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -76,7 +76,7 @@ public class StreamGroupedFoldTest {
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index dca1cbb..9e35fa2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -70,7 +70,7 @@ public class StreamGroupedReduceTest {
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
index d5f2f62..4d12492 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
@@ -57,7 +57,7 @@ public class StreamMapTest {
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
index ede7db5..e8f0a03 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.joda.time.Instant;
 import org.junit.Test;
 
 /**
@@ -75,7 +74,7 @@ public class StreamProjectTest implements Serializable {
 		OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> testHarness = new OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.open();
 
@@ -110,13 +109,17 @@ public class StreamProjectTest implements Serializable {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
 
 		env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
-				@Override
-				public Tuple3<Long, Character, Double> map(Long value) throws Exception {
-					return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
-				}
-			})
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Tuple3<Long, Character, Double> map(Long value) throws Exception {
+				return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
+			}
+		})
 			.project(0, 2)
 			.addSink(new SinkFunction<Tuple>() {
+				private static final long serialVersionUID = 1L;
+
 				@Override
 				@SuppressWarnings("unchecked")
 				public void invoke(Tuple value) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
index 2c9ba5c..b8e9619 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -72,7 +71,7 @@ public class CoStreamFlatMapTest implements Serializable {
 		TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
index dcf4972..28ae664 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
@@ -18,27 +18,18 @@
 package org.apache.flink.streaming.api.operators.co;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import static org.junit.Assert.fail;
-
 /**
  * Tests for {@link org.apache.flink.streaming.api.operators.co.CoStreamMap}. These test that:
  *
@@ -73,7 +64,7 @@ public class CoStreamMapTest implements Serializable {
 		TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index d623dd8..4399a10 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -28,16 +28,13 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -68,7 +65,7 @@ public class OneInputStreamTaskTest {
 		streamConfig.setStreamOperator(mapOperator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.invoke();
 
@@ -103,7 +100,7 @@ public class OneInputStreamTaskTest {
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
 		streamConfig.setStreamOperator(mapOperator);
 
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 		long initialTime = 0L;
 
 		testHarness.invoke();
@@ -179,7 +176,7 @@ public class OneInputStreamTaskTest {
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
 		streamConfig.setStreamOperator(mapOperator);
 
-		Queue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 		long initialTime = 0L;
 
 		testHarness.invoke();
@@ -237,7 +234,7 @@ public class OneInputStreamTaskTest {
 		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
 		streamConfig.setStreamOperator(mapOperator);
 
-		Queue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 		long initialTime = 0L;
 
 		testHarness.invoke();

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index f34eafe..0f6e5f1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -89,6 +89,7 @@ public class SourceStreamTaskTest {
 	 * source kept emitting elements while the checkpoint was ongoing.
 	 */
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testCheckpointing() throws Exception {
 		final int NUM_ELEMENTS = 100;
 		final int NUM_CHECKPOINTS = 100;
@@ -108,7 +109,7 @@ public class SourceStreamTaskTest {
 
 
 		ExecutorService executor = Executors.newFixedThreadPool(10);
-		Future[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
+		Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
 		for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
 			checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
 		}
@@ -131,7 +132,7 @@ public class SourceStreamTaskTest {
 		Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
 	}
 
-	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
+	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
 		private static final long serialVersionUID = 1;
 
 		private int maxElements;

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index a4cc0d3..283243e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
@@ -43,7 +42,6 @@ import org.junit.Assert;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 
@@ -85,7 +83,7 @@ public class StreamTaskTestHarness<OUT> {
 	private TypeSerializer<OUT> outputSerializer;
 	private StreamRecordSerializer<OUT> outputStreamRecordSerializer;
 
-	private ConcurrentLinkedQueue outputList;
+	private ConcurrentLinkedQueue<Object> outputList;
 
 	protected Thread taskThread;
 
@@ -94,6 +92,7 @@ public class StreamTaskTestHarness<OUT> {
 	// input related methods only need to be implemented once, in generic form
 	protected int numInputGates;
 	protected int numInputChannelsPerGate;
+	@SuppressWarnings("rawtypes")
 	protected StreamTestSingleInputGate[] inputGates;
 
 	public StreamTaskTestHarness(AbstractInvokable task, TypeInformation<OUT> outputType) {
@@ -198,7 +197,7 @@ public class StreamTaskTestHarness<OUT> {
 	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getRawElementsFromOutput(java.util.Queue)}}
 	 * to extract only the StreamRecords.
 	 */
-	public Queue getOutput() {
+	public ConcurrentLinkedQueue<Object> getOutput() {
 		return outputList;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 3b113ab..3c7204d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -69,7 +69,7 @@ public class TwoInputStreamTaskTest {
 		streamConfig.setStreamOperator(coMapOperator);
 
 		long initialTime = 0L;
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 
 		testHarness.invoke();
 
@@ -107,7 +107,7 @@ public class TwoInputStreamTaskTest {
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
 		streamConfig.setStreamOperator(coMapOperator);
 
-		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 		long initialTime = 0L;
 
 		testHarness.invoke();
@@ -186,7 +186,7 @@ public class TwoInputStreamTaskTest {
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
 		streamConfig.setStreamOperator(coMapOperator);
 
-		Queue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 		long initialTime = 0L;
 
 		testHarness.invoke();
@@ -252,7 +252,7 @@ public class TwoInputStreamTaskTest {
 		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
 		streamConfig.setStreamOperator(coMapOperator);
 
-		Queue expectedOutput = new ConcurrentLinkedQueue();
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
 		long initialTime = 0L;
 
 		testHarness.invoke();

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index d3fde9e..52de8aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -235,6 +235,7 @@ public class TimestampITCase {
 		env.execute();
 	}
 
+	@SuppressWarnings("unchecked")
 	public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
 
 		List<Watermark> watermarks;

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 133f143..6652fde 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -34,7 +34,6 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -49,14 +48,14 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
 	OneInputStreamOperator<IN, OUT> operator;
 
-	ConcurrentLinkedQueue outputList;
+	ConcurrentLinkedQueue<Object> outputList;
 
 	ExecutionConfig executionConfig;
 
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
 		this.operator = operator;
 
-		outputList = new ConcurrentLinkedQueue();
+		outputList = new ConcurrentLinkedQueue<Object>();
 
 		executionConfig = new ExecutionConfig();
 
@@ -77,7 +76,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
 	 * to extract only the StreamRecords.
 	 */
-	public Queue getOutput() {
+	public ConcurrentLinkedQueue<Object> getOutput() {
 		return outputList;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index a0a6c8d..0732b64 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -32,7 +32,7 @@ public class TestHarnessUtil {
 	 * Extracts the StreamRecords from the given output list.
 	 */
 	@SuppressWarnings("unchecked")
-	public static <OUT> List<StreamRecord<OUT>> getStreamRecordsFromOutput(List output) {
+	public static <OUT> List<StreamRecord<OUT>> getStreamRecordsFromOutput(List<Object> output) {
 		List<StreamRecord<OUT>> resultElements = new LinkedList<StreamRecord<OUT>>();
 		for (Object e: output) {
 			if (e instanceof StreamRecord) {
@@ -46,11 +46,11 @@ public class TestHarnessUtil {
 	 * Extracts the raw elements from the given output list.
 	 */
 	@SuppressWarnings("unchecked")
-	public static <OUT> List<OUT> getRawElementsFromOutput(Queue output) {
+	public static <OUT> List<OUT> getRawElementsFromOutput(Queue<Object> output) {
 		List<OUT> resultElements = new LinkedList<OUT>();
 		for (Object e: output) {
 			if (e instanceof StreamRecord) {
-				resultElements.add((OUT) ((StreamRecord) e).getValue());
+				resultElements.add(((StreamRecord<OUT>) e).getValue());
 			}
 		}
 		return resultElements;
@@ -59,7 +59,7 @@ public class TestHarnessUtil {
 	/**
 	 * Compare the two queues containing operator/task output by converting them to an array first.
 	 */
-	public static void assertOutputEquals(String message, Queue expected, Queue actual) {
+	public static void assertOutputEquals(String message, Queue<Object> expected, Queue<Object> actual) {
 		Assert.assertArrayEquals(message,
 				expected.toArray(),
 				actual.toArray());

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index ea753f8..1e8b5c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -48,14 +48,14 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 
 	TwoInputStreamOperator<IN1, IN2, OUT> operator;
 
-	ConcurrentLinkedQueue outputList;
+	ConcurrentLinkedQueue<Object> outputList;
 
 	ExecutionConfig executionConfig;
 
 	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) {
 		this.operator = operator;
 
-		outputList = new ConcurrentLinkedQueue();
+		outputList = new ConcurrentLinkedQueue<Object>();
 
 		executionConfig = new ExecutionConfig();
 
@@ -76,7 +76,7 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
 	 * to extract only the StreamRecords.
 	 */
-	public Queue getOutput() {
+	public ConcurrentLinkedQueue<Object> getOutput() {
 		return outputList;
 	}