You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/24 20:39:52 UTC
flink git commit: [hotfix] Fix warnings introduced by recent
Watermark Commit
Repository: flink
Updated Branches:
refs/heads/master efca79cfb -> 31c0c2925
[hotfix] Fix warnings introduced by recent Watermark Commit
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31c0c292
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31c0c292
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31c0c292
Branch: refs/heads/master
Commit: 31c0c2925c63c2999aa831602f52601d33cd6b47
Parents: efca79c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Jul 24 11:29:12 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jul 24 17:44:35 2015 +0200
----------------------------------------------------------------------
.../BroadcastOutputSelectorWrapper.java | 2 +-
.../selector/DirectedOutputSelectorWrapper.java | 2 +-
.../streaming/api/operators/StreamFlatMap.java | 4 ++--
.../operators/windowing/StreamDiscretizer.java | 6 +++---
.../streaming/runtime/io/CollectorWrapper.java | 3 ++-
.../runtime/io/RecordWriterOutput.java | 12 +++++------
.../runtime/io/StreamInputProcessor.java | 10 +++++-----
.../runtime/io/StreamTwoInputProcessor.java | 8 ++++----
.../runtime/streamrecord/StreamRecord.java | 2 +-
.../streamrecord/StreamRecordSerializer.java | 2 +-
.../streaming/runtime/tasks/OutputHandler.java | 21 ++++++++++----------
.../streaming/util/keys/KeySelectorUtil.java | 1 +
.../consumer/StreamTestSingleInputGate.java | 5 +++--
.../api/operators/StreamCounterTest.java | 3 +--
.../api/operators/StreamFilterTest.java | 2 +-
.../api/operators/StreamFlatMapTest.java | 3 +--
.../api/operators/StreamGroupedFoldTest.java | 2 +-
.../api/operators/StreamGroupedReduceTest.java | 2 +-
.../streaming/api/operators/StreamMapTest.java | 2 +-
.../api/operators/StreamProjectTest.java | 17 +++++++++-------
.../api/operators/co/CoStreamFlatMapTest.java | 3 +--
.../api/operators/co/CoStreamMapTest.java | 11 +---------
.../runtime/tasks/OneInputStreamTaskTest.java | 11 ++++------
.../runtime/tasks/SourceStreamTaskTest.java | 5 +++--
.../runtime/tasks/StreamTaskTestHarness.java | 7 +++----
.../runtime/tasks/TwoInputStreamTaskTest.java | 8 ++++----
.../streaming/timestamp/TimestampITCase.java | 1 +
.../util/OneInputStreamOperatorTestHarness.java | 7 +++----
.../flink/streaming/util/TestHarnessUtil.java | 8 ++++----
.../util/TwoInputStreamOperatorTestHarness.java | 6 +++---
30 files changed, 83 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
index 0fe84d8..00c6f80 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
@@ -33,7 +33,7 @@ public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrappe
outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked,rawtypes")
@Override
public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
Collector output1 = output;
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
index 46b315d..c6e3388 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
@@ -47,7 +47,7 @@ public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper
this.outputMap = new HashMap<String, List<Collector<StreamRecord<OUT>>>>();
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked,rawtypes")
@Override
public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
Collector output1 = output;
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index 5547c6a..ff7f662 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -28,7 +28,7 @@ public class StreamFlatMap<IN, OUT>
private static final long serialVersionUID = 1L;
- private TimestampedCollector<OUT> collector;
+ private transient TimestampedCollector<OUT> collector;
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
@@ -38,7 +38,7 @@ public class StreamFlatMap<IN, OUT>
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- collector = new TimestampedCollector(output);
+ collector = new TimestampedCollector<OUT>(output);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
index df84b62..47c2323 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
@@ -132,7 +132,7 @@ public class StreamDiscretizer<IN>
* if not empty
*/
protected void emitWindow() {
- output.collect(new StreamRecord(windowEvent.setTrigger()));
+ output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setTrigger()));
}
private void activeEvict(Object input) {
@@ -144,7 +144,7 @@ public class StreamDiscretizer<IN>
}
if (numToEvict > 0) {
- output.collect(new StreamRecord(windowEvent.setEviction(numToEvict)));
+ output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
bufferSize -= numToEvict;
bufferSize = bufferSize >= 0 ? bufferSize : 0;
}
@@ -154,7 +154,7 @@ public class StreamDiscretizer<IN>
int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, bufferSize);
if (numToEvict > 0) {
- output.collect(new StreamRecord(windowEvent.setEviction(numToEvict)));
+ output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
bufferSize -= numToEvict;
bufferSize = bufferSize >= 0 ? bufferSize : 0;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
index 2f9d1d6..6bb44dd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
@@ -38,7 +38,8 @@ public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
allOutputs = new ArrayList<Output<OUT>>();
}
- public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
+ @SuppressWarnings("unchecked,rawtypes")
+ public void addCollector(Output<StreamRecord<?>> output, StreamEdge edge) {
outputSelectorWrapper.addCollector(output, edge);
allOutputs.add((Output) output);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index e9cbb7d..b656bb5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -39,12 +39,12 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);
- private RecordWriter<SerializationDelegate> recordWriter;
- private SerializationDelegate serializationDelegate;
+ private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
+ private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
@SuppressWarnings("unchecked")
public RecordWriterOutput(
- RecordWriter<SerializationDelegate> recordWriter,
+ RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
TypeSerializer<OUT> outSerializer,
boolean enableWatermarkMultiplexing) {
Preconditions.checkNotNull(recordWriter);
@@ -79,9 +79,9 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
}
@Override
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked,rawtypes")
public void emitWatermark(Watermark mark) {
- serializationDelegate.setInstance(mark);
+ ((SerializationDelegate)serializationDelegate).setInstance(mark);
try {
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
@@ -95,7 +95,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
@Override
public void close() {
if (recordWriter instanceof StreamRecordWriter) {
- ((StreamRecordWriter) recordWriter).close();
+ ((StreamRecordWriter<?>) recordWriter).close();
} else {
try {
recordWriter.flush();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index e665710..4c40e5f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -57,9 +57,9 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
- private final RecordDeserializer<DeserializationDelegate>[] recordDeserializers;
+ private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;
- private RecordDeserializer<DeserializationDelegate> currentRecordDeserializer;
+ private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
@@ -72,7 +72,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
private long[] watermarks;
private long lastEmittedWatermark;
- private DeserializationDelegate deserializationDelegate;
+ private DeserializationDelegate<Object> deserializationDelegate;
@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
@@ -86,12 +86,12 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
} else {
inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer);
}
- this.deserializationDelegate = new NonReusingDeserializationDelegate(inputRecordSerializer);
+ this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(inputRecordSerializer);
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
- recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate>();
+ recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
}
watermarks = new long[inputGate.getNumberOfInputChannels()];
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 1fe98bb..82e7936 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -58,9 +58,9 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
- private final RecordDeserializer[] recordDeserializers;
+ private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;
- private RecordDeserializer currentRecordDeserializer;
+ private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
@@ -79,8 +79,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
private int numInputChannels1;
private int numInputChannels2;
- private DeserializationDelegate deserializationDelegate1;
- private DeserializationDelegate deserializationDelegate2;
+ private DeserializationDelegate<Object> deserializationDelegate1;
+ private DeserializationDelegate<Object> deserializationDelegate2;
@SuppressWarnings("unchecked")
public StreamTwoInputProcessor(
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index aff030e..6521e7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -103,7 +103,7 @@ public class StreamRecord<T> {
return false;
}
- StreamRecord that = (StreamRecord) o;
+ StreamRecord<?> that = (StreamRecord<?>) o;
return value.equals(that.value) && timestamp == that.timestamp;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index b05eb36..2619891 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -58,7 +58,7 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
@Override
@SuppressWarnings("unchecked")
- public TypeSerializer duplicate() {
+ public StreamRecordSerializer<T> duplicate() {
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index cf17b3e..aa55151 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -44,7 +44,6 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,18 +145,18 @@ public class OutputHandler<OUT> {
// Create collectors for the network outputs
for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
- Collector<?> outCollector = outputMap.get(outputEdge);
+ Output<?> output = outputMap.get(outputEdge);
- wrapper.addCollector(outCollector, outputEdge);
+ wrapper.addCollector(output, outputEdge);
}
// Create collectors for the chained outputs
for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
- Integer output = outputEdge.getTargetId();
+ Integer outputId = outputEdge.getTargetId();
- Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output), accumulatorMap);
+ Output<?> output = createChainedCollector(chainedConfigs.get(outputId), accumulatorMap);
- wrapper.addCollector(outCollector, outputEdge);
+ wrapper.addCollector(output, outputEdge);
}
if (chainedTaskConfig.isChainStart()) {
@@ -200,7 +199,7 @@ public class OutputHandler<OUT> {
* the configuration of its source task
*
* @param outputVertex
- * Name of the output to which the streamoutput will be set up
+ * Name of the output to which the stream output will be set up
* @param upStreamConfig
* The config of upStream task
* @return The created StreamOutput
@@ -222,7 +221,7 @@ public class OutputHandler<OUT> {
output.setReporter(reporter);
@SuppressWarnings("unchecked")
- RecordWriterOutput<T> streamOutput = new RecordWriterOutput<T>((RecordWriter) output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
+ RecordWriterOutput<T> streamOutput = new RecordWriterOutput<T>(output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
if (LOG.isTraceEnabled()) {
LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
@@ -245,9 +244,9 @@ public class OutputHandler<OUT> {
}
private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
- protected OneInputStreamOperator operator;
+ protected OneInputStreamOperator<T, ?> operator;
- public ChainingOutput(OneInputStreamOperator<?, T> operator) {
+ public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
this.operator = operator;
}
@@ -292,7 +291,7 @@ public class OutputHandler<OUT> {
private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
private final TypeSerializer<StreamRecord<T>> serializer;
- public CopyingChainingOutput(OneInputStreamOperator<?, T> operator,
+ public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator,
TypeSerializer<StreamRecord<T>> serializer) {
super(operator);
this.serializer = serializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 49f2fe0..89c6142 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -103,6 +103,7 @@ public class KeySelectorUtil {
}
@Override
+ @SuppressWarnings("unchecked")
public K getKey(IN value) throws Exception {
comparator.extractKeys(value, keyArray, 0);
key = (K) keyArray[0];
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index a20436a..c479f95 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -62,6 +62,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
private ConcurrentLinkedQueue<InputValue<Object>>[] inputQueues;
+ @SuppressWarnings("unchecked")
public StreamTestSingleInputGate(
int numInputChannels,
int bufferSize,
@@ -84,8 +85,8 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
- final RecordSerializer<SerializationDelegate<StreamRecord<T>>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<StreamRecord<T>>>();
- final SerializationDelegate delegate = new SerializationDelegate(new MultiplexingStreamRecordSerializer<T>(serializer));
+ final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
+ final SerializationDelegate<Object> delegate = new SerializationDelegate(new MultiplexingStreamRecordSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
index 3e662ba..dc8024c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
import org.junit.Test;
/**
@@ -43,7 +42,7 @@ public class StreamCounterTest {
OneInputStreamOperatorTestHarness<String, Long> testHarness = new OneInputStreamOperatorTestHarness<String, Long>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
index f672a89..bf4fe40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
@@ -58,7 +58,7 @@ public class StreamFilterTest {
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
index ac7caa7..e4e29c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
@@ -62,7 +61,7 @@ public class StreamFlatMapTest {
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index 8499aa2..cb08e65 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -76,7 +76,7 @@ public class StreamGroupedFoldTest {
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index dca1cbb..9e35fa2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -70,7 +70,7 @@ public class StreamGroupedReduceTest {
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
index d5f2f62..4d12492 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
@@ -57,7 +57,7 @@ public class StreamMapTest {
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
index ede7db5..e8f0a03 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.joda.time.Instant;
import org.junit.Test;
/**
@@ -75,7 +74,7 @@ public class StreamProjectTest implements Serializable {
OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> testHarness = new OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
@@ -110,13 +109,17 @@ public class StreamProjectTest implements Serializable {
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
- @Override
- public Tuple3<Long, Character, Double> map(Long value) throws Exception {
- return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
- }
- })
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple3<Long, Character, Double> map(Long value) throws Exception {
+ return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
+ }
+ })
.project(0, 2)
.addSink(new SinkFunction<Tuple>() {
+ private static final long serialVersionUID = 1L;
+
@Override
@SuppressWarnings("unchecked")
public void invoke(Tuple value) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
index 2c9ba5c..b8e9619 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
@@ -72,7 +71,7 @@ public class CoStreamFlatMapTest implements Serializable {
TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
index dcf4972..28ae664 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
@@ -18,27 +18,18 @@
package org.apache.flink.streaming.api.operators.co;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import java.io.Serializable;
import java.util.concurrent.ConcurrentLinkedQueue;
-import static org.junit.Assert.fail;
-
/**
* Tests for {@link org.apache.flink.streaming.api.operators.co.CoStreamMap}. These test that:
*
@@ -73,7 +64,7 @@ public class CoStreamMapTest implements Serializable {
TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index d623dd8..4399a10 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -28,16 +28,13 @@ import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
-import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -68,7 +65,7 @@ public class OneInputStreamTaskTest {
streamConfig.setStreamOperator(mapOperator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.invoke();
@@ -103,7 +100,7 @@ public class OneInputStreamTaskTest {
StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
streamConfig.setStreamOperator(mapOperator);
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
long initialTime = 0L;
testHarness.invoke();
@@ -179,7 +176,7 @@ public class OneInputStreamTaskTest {
StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
streamConfig.setStreamOperator(mapOperator);
- Queue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
long initialTime = 0L;
testHarness.invoke();
@@ -237,7 +234,7 @@ public class OneInputStreamTaskTest {
StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
streamConfig.setStreamOperator(mapOperator);
- Queue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
long initialTime = 0L;
testHarness.invoke();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index f34eafe..0f6e5f1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -89,6 +89,7 @@ public class SourceStreamTaskTest {
* source kept emitting elements while the checkpoint was ongoing.
*/
@Test
+ @SuppressWarnings("unchecked")
public void testCheckpointing() throws Exception {
final int NUM_ELEMENTS = 100;
final int NUM_CHECKPOINTS = 100;
@@ -108,7 +109,7 @@ public class SourceStreamTaskTest {
ExecutorService executor = Executors.newFixedThreadPool(10);
- Future[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
+ Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
}
@@ -131,7 +132,7 @@ public class SourceStreamTaskTest {
Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
}
- private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
+ private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
private static final long serialVersionUID = 1;
private int maxElements;
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index a4cc0d3..283243e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
@@ -43,7 +42,6 @@ import org.junit.Assert;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -85,7 +83,7 @@ public class StreamTaskTestHarness<OUT> {
private TypeSerializer<OUT> outputSerializer;
private StreamRecordSerializer<OUT> outputStreamRecordSerializer;
- private ConcurrentLinkedQueue outputList;
+ private ConcurrentLinkedQueue<Object> outputList;
protected Thread taskThread;
@@ -94,6 +92,7 @@ public class StreamTaskTestHarness<OUT> {
// input related methods only need to be implemented once, in generic form
protected int numInputGates;
protected int numInputChannelsPerGate;
+ @SuppressWarnings("rawtypes")
protected StreamTestSingleInputGate[] inputGates;
public StreamTaskTestHarness(AbstractInvokable task, TypeInformation<OUT> outputType) {
@@ -198,7 +197,7 @@ public class StreamTaskTestHarness<OUT> {
* {@link org.apache.flink.streaming.util.TestHarnessUtil#getRawElementsFromOutput(java.util.Queue)}}
* to extract only the StreamRecords.
*/
- public Queue getOutput() {
+ public ConcurrentLinkedQueue<Object> getOutput() {
return outputList;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 3b113ab..3c7204d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -69,7 +69,7 @@ public class TwoInputStreamTaskTest {
streamConfig.setStreamOperator(coMapOperator);
long initialTime = 0L;
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.invoke();
@@ -107,7 +107,7 @@ public class TwoInputStreamTaskTest {
CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
streamConfig.setStreamOperator(coMapOperator);
- ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
long initialTime = 0L;
testHarness.invoke();
@@ -186,7 +186,7 @@ public class TwoInputStreamTaskTest {
CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
streamConfig.setStreamOperator(coMapOperator);
- Queue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
long initialTime = 0L;
testHarness.invoke();
@@ -252,7 +252,7 @@ public class TwoInputStreamTaskTest {
CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
streamConfig.setStreamOperator(coMapOperator);
- Queue expectedOutput = new ConcurrentLinkedQueue();
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
long initialTime = 0L;
testHarness.invoke();
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index d3fde9e..52de8aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -235,6 +235,7 @@ public class TimestampITCase {
env.execute();
}
+ @SuppressWarnings("unchecked")
public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
List<Watermark> watermarks;
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 133f143..6652fde 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -34,7 +34,6 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -49,14 +48,14 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
OneInputStreamOperator<IN, OUT> operator;
- ConcurrentLinkedQueue outputList;
+ ConcurrentLinkedQueue<Object> outputList;
ExecutionConfig executionConfig;
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
this.operator = operator;
- outputList = new ConcurrentLinkedQueue();
+ outputList = new ConcurrentLinkedQueue<Object>();
executionConfig = new ExecutionConfig();
@@ -77,7 +76,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
* {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
* to extract only the StreamRecords.
*/
- public Queue getOutput() {
+ public ConcurrentLinkedQueue<Object> getOutput() {
return outputList;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index a0a6c8d..0732b64 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -32,7 +32,7 @@ public class TestHarnessUtil {
* Extracts the StreamRecords from the given output list.
*/
@SuppressWarnings("unchecked")
- public static <OUT> List<StreamRecord<OUT>> getStreamRecordsFromOutput(List output) {
+ public static <OUT> List<StreamRecord<OUT>> getStreamRecordsFromOutput(List<Object> output) {
List<StreamRecord<OUT>> resultElements = new LinkedList<StreamRecord<OUT>>();
for (Object e: output) {
if (e instanceof StreamRecord) {
@@ -46,11 +46,11 @@ public class TestHarnessUtil {
* Extracts the raw elements from the given output list.
*/
@SuppressWarnings("unchecked")
- public static <OUT> List<OUT> getRawElementsFromOutput(Queue output) {
+ public static <OUT> List<OUT> getRawElementsFromOutput(Queue<Object> output) {
List<OUT> resultElements = new LinkedList<OUT>();
for (Object e: output) {
if (e instanceof StreamRecord) {
- resultElements.add((OUT) ((StreamRecord) e).getValue());
+ resultElements.add(((StreamRecord<OUT>) e).getValue());
}
}
return resultElements;
@@ -59,7 +59,7 @@ public class TestHarnessUtil {
/**
* Compare the two queues containing operator/task output by converting them to an array first.
*/
- public static void assertOutputEquals(String message, Queue expected, Queue actual) {
+ public static void assertOutputEquals(String message, Queue<Object> expected, Queue<Object> actual) {
Assert.assertArrayEquals(message,
expected.toArray(),
actual.toArray());
http://git-wip-us.apache.org/repos/asf/flink/blob/31c0c292/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index ea753f8..1e8b5c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -48,14 +48,14 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
TwoInputStreamOperator<IN1, IN2, OUT> operator;
- ConcurrentLinkedQueue outputList;
+ ConcurrentLinkedQueue<Object> outputList;
ExecutionConfig executionConfig;
public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) {
this.operator = operator;
- outputList = new ConcurrentLinkedQueue();
+ outputList = new ConcurrentLinkedQueue<Object>();
executionConfig = new ExecutionConfig();
@@ -76,7 +76,7 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
* {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
* to extract only the StreamRecords.
*/
- public Queue getOutput() {
+ public ConcurrentLinkedQueue<Object> getOutput() {
return outputList;
}