You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 00:15:47 UTC
[5/8] flink git commit: [hotfix] Fix generics for stream record and
watermark multiplexing.
[hotfix] Fix generics for stream record and watermark multiplexing.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acae9ff2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acae9ff2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acae9ff2
Branch: refs/heads/master
Commit: acae9ff2583384dada84b40a89d3a068e3b2a00c
Parents: 8ba3213
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 18:07:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200
----------------------------------------------------------------------
.../runtime/plugable/SerializationDelegate.java | 7 +-
.../runtime/io/RecordWriterOutput.java | 21 ++---
.../runtime/io/StreamInputProcessor.java | 25 +++---
.../runtime/io/StreamRecordWriter.java | 2 +-
.../runtime/io/StreamTwoInputProcessor.java | 57 +++++++------
.../MultiplexingStreamRecordSerializer.java | 35 ++++++--
.../runtime/streamrecord/StreamRecord.java | 16 ++--
.../streamrecord/StreamRecordSerializer.java | 65 ++++++++-------
.../streaming/runtime/tasks/OutputHandler.java | 85 +++++++++++---------
.../runtime/tasks/StreamTaskTestHarness.java | 7 +-
10 files changed, 185 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
index 3cbaac3..91b6dd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
@@ -26,7 +26,12 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
+/**
+ * The serialization delegate exposes an arbitrary element as a {@link IOReadableWritable} for
+ * serialization, with the help of a type serializer.
+ *
+ * @param <T> The type to be represented as an IOReadableWritable.
+ */
public class SerializationDelegate<T> implements IOReadableWritable {
private T instance;
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index f7d8d47..de8c205 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -39,32 +39,33 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);
- private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
- private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
+ private RecordWriter<SerializationDelegate<Object>> recordWriter;
+
+ private SerializationDelegate<Object> serializationDelegate;
@SuppressWarnings("unchecked")
public RecordWriterOutput(
- RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
+ RecordWriter<?> recordWriter,
TypeSerializer<OUT> outSerializer,
boolean enableWatermarkMultiplexing) {
+
Preconditions.checkNotNull(recordWriter);
- this.recordWriter = recordWriter;
+ this.recordWriter = (RecordWriter<SerializationDelegate<Object>>) recordWriter;
- StreamRecordSerializer<OUT> outRecordSerializer;
+ TypeSerializer<Object> outRecordSerializer;
if (enableWatermarkMultiplexing) {
outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
} else {
- outRecordSerializer = new StreamRecordSerializer<OUT>(outSerializer);
+ outRecordSerializer = (TypeSerializer<Object>) (TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
}
if (outSerializer != null) {
- serializationDelegate = new SerializationDelegate(outRecordSerializer);
+ serializationDelegate = new SerializationDelegate<Object>(outRecordSerializer);
}
}
@Override
- @SuppressWarnings("unchecked")
public void collect(StreamRecord<OUT> record) {
serializationDelegate.setInstance(record);
@@ -79,9 +80,9 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
}
@Override
- @SuppressWarnings("unchecked,rawtypes")
public void emitWatermark(Watermark mark) {
- ((SerializationDelegate)serializationDelegate).setInstance(mark);
+ serializationDelegate.setInstance(mark);
+
try {
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 4c40e5f..9db0178 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -46,9 +46,8 @@ import org.slf4j.LoggerFactory;
/**
* Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
*
- * <p>
- * This also keeps track of {@link Watermark} events and forwards them to event subscribers
- * once the {@link Watermark} from all inputs advances.
+ * <p>This also keeps track of {@link Watermark} events and forwards them to event subscribers
+ * once the {@link Watermark} from all inputs advances.</p>
*
* @param <IN> The type of the record that can be read with this record reader.
*/
@@ -63,33 +62,35 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
- int currentChannel = -1;
+ private int currentChannel = -1;
private boolean isFinished;
private final BarrierBuffer barrierBuffer;
- private long[] watermarks;
+ private final long[] watermarks;
private long lastEmittedWatermark;
- private DeserializationDelegate<Object> deserializationDelegate;
+ private final DeserializationDelegate<Object> deserializationDelegate;
@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
super(InputGateUtil.createInputGate(inputGates));
barrierBuffer = new BarrierBuffer(inputGate, this);
-
- StreamRecordSerializer<IN> inputRecordSerializer;
+
if (enableWatermarkMultiplexing) {
- inputRecordSerializer = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+ MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+ this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(ser);
} else {
- inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer);
+ StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
+ this.deserializationDelegate = (NonReusingDeserializationDelegate<Object>)
+ (NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
}
- this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(inputRecordSerializer);
-
+
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index b0e2532..321f3b4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -58,7 +58,7 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
super(writer, channelSelector);
- checkArgument(timeout < 0);
+ checkArgument(timeout >= 0);
if (timeout == 0) {
flushAlways = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 82e7936..e235ffe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -64,64 +64,70 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
- int currentChannel = -1;
+ private int currentChannel = -1;
private boolean isFinished;
private final BarrierBuffer barrierBuffer;
- private long[] watermarks1;
+ private final long[] watermarks1;
private long lastEmittedWatermark1;
- private long[] watermarks2;
+ private final long[] watermarks2;
private long lastEmittedWatermark2;
- private int numInputChannels1;
- private int numInputChannels2;
+ private final int numInputChannels1;
- private DeserializationDelegate<Object> deserializationDelegate1;
- private DeserializationDelegate<Object> deserializationDelegate2;
+ private final DeserializationDelegate<Object> deserializationDelegate1;
+ private final DeserializationDelegate<Object> deserializationDelegate2;
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
public StreamTwoInputProcessor(
Collection<InputGate> inputGates1,
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
boolean enableWatermarkMultiplexing) {
+
super(InputGateUtil.createInputGate(inputGates1, inputGates2));
barrierBuffer = new BarrierBuffer(inputGate, this);
-
- StreamRecordSerializer<IN1> inputRecordSerializer1;
+
if (enableWatermarkMultiplexing) {
- inputRecordSerializer1 = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
- } else {
- inputRecordSerializer1 = new StreamRecordSerializer<IN1>(inputSerializer1);
+ MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
+ this.deserializationDelegate1 = new NonReusingDeserializationDelegate<Object>(ser);
}
- this.deserializationDelegate1 = new NonReusingDeserializationDelegate(inputRecordSerializer1);
-
- StreamRecordSerializer<IN2> inputRecordSerializer2;
+ else {
+ StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
+ this.deserializationDelegate1 = (DeserializationDelegate<Object>)
+ (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
+ }
+
if (enableWatermarkMultiplexing) {
- inputRecordSerializer2 = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
- } else {
- inputRecordSerializer2 = new StreamRecordSerializer<IN2>(inputSerializer2);
+ MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
+ this.deserializationDelegate2 = new NonReusingDeserializationDelegate<Object>(ser);
+ }
+ else {
+ StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
+ this.deserializationDelegate2 = (DeserializationDelegate<Object>)
+ (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
}
- this.deserializationDelegate2 = new NonReusingDeserializationDelegate(inputRecordSerializer2);
// Initialize one deserializer per input channel
- this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
- .getNumberOfInputChannels()];
+ this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+
for (int i = 0; i < recordDeserializers.length; i++) {
- recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer();
+ recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
}
// determine which unioned channels belong to input 1 and which belong to input 2
- numInputChannels1 = 0;
+ int numInputChannels1 = 0;
for (InputGate gate: inputGates1) {
numInputChannels1 += gate.getNumberOfInputChannels();
}
- numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
+
+ this.numInputChannels1 = numInputChannels1;
+ int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
watermarks1 = new long[numInputChannels1];
for (int i = 0; i < numInputChannels1; i++) {
@@ -262,6 +268,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
}
}
+ @Override
public void cleanup() throws IOException {
barrierBuffer.cleanup();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 715f0d2..075c4fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.streamrecord;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -35,17 +36,36 @@ import java.io.IOException;
*
* @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
*/
-public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSerializer<T> {
-
- private final long IS_WATERMARK = Long.MIN_VALUE;
+public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Object> {
private static final long serialVersionUID = 1L;
+ private static final long IS_WATERMARK = Long.MIN_VALUE;
+
+ protected final TypeSerializer<T> typeSerializer;
+
+
public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
- super(serializer);
- if (serializer instanceof MultiplexingStreamRecordSerializer) {
+ if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
}
+ this.typeSerializer = Preconditions.checkNotNull(serializer);
+ }
+
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<Object> duplicate() {
+ return this;
+ }
+
+ @Override
+ public Object createInstance() {
+ return new StreamRecord<T>(typeSerializer.createInstance(), 0L);
}
@Override
@@ -81,6 +101,11 @@ public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSer
}
@Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
@SuppressWarnings("unchecked")
public void serialize(Object value, DataOutputView target) throws IOException {
if (value instanceof StreamRecord) {
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 6521e7f..92ce66f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -19,12 +19,13 @@ package org.apache.flink.streaming.runtime.streamrecord;
/**
* One value in a data stream. This stores the value and the associated timestamp.
+ *
+ * @param <T> The type encapsulated with the stream record.
*/
public class StreamRecord<T> {
-
- // We store it as Object so that we can reuse a StreamElement for emitting
- // elements of a different type while still reusing the timestamp.
- private Object value;
+
+ private T value;
+
private long timestamp;
/**
@@ -52,9 +53,8 @@ public class StreamRecord<T> {
/**
* Returns the value wrapped in this stream value.
*/
- @SuppressWarnings("unchecked")
public T getValue() {
- return (T) value;
+ return value;
}
/**
@@ -74,7 +74,7 @@ public class StreamRecord<T> {
*/
@SuppressWarnings("unchecked")
public <X> StreamRecord<X> replace(X element) {
- this.value = element;
+ this.value = (T) element;
return (StreamRecord<X>) this;
}
@@ -90,7 +90,7 @@ public class StreamRecord<T> {
@SuppressWarnings("unchecked")
public <X> StreamRecord<X> replace(X value, long timestamp) {
this.timestamp = timestamp;
- this.value = value;
+ this.value = (T) value;
return (StreamRecord<X>) this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 2619891..e58d3c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -38,11 +38,12 @@ import org.apache.flink.core.memory.DataOutputView;
*
* @param <T> The type of value in the {@link StreamRecord}
*/
-public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
+public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
private static final long serialVersionUID = 1L;
- protected final TypeSerializer<T> typeSerializer;
+ private final TypeSerializer<T> typeSerializer;
+
public StreamRecordSerializer(TypeSerializer<T> serializer) {
if (serializer instanceof StreamRecordSerializer) {
@@ -51,19 +52,36 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
this.typeSerializer = Preconditions.checkNotNull(serializer);
}
+ public TypeSerializer<T> getContainedTypeSerializer() {
+ return this.typeSerializer;
+ }
+
+ // ------------------------------------------------------------------------
+ // General serializer and type utils
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamRecordSerializer<T> duplicate() {
+ TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
+ return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
+ }
+
@Override
public boolean isImmutableType() {
return false;
}
@Override
- @SuppressWarnings("unchecked")
- public StreamRecordSerializer<T> duplicate() {
- return this;
+ public int getLength() {
+ return typeSerializer.getLength();
}
+ // ------------------------------------------------------------------------
+ // Type serialization, copying, instantiation
+ // ------------------------------------------------------------------------
+
@Override
- public Object createInstance() {
+ public StreamRecord<T> createInstance() {
try {
return new StreamRecord<T>(typeSerializer.createInstance());
} catch (Exception e) {
@@ -72,46 +90,31 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
}
@Override
- @SuppressWarnings("unchecked")
- public Object copy(Object from) {
- StreamRecord<T> fromRecord = (StreamRecord<T>) from;
- return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
+ public StreamRecord<T> copy(StreamRecord<T> from) {
+ return new StreamRecord<T>(typeSerializer.copy(from.getValue()), from.getTimestamp());
}
@Override
- @SuppressWarnings("unchecked")
- public Object copy(Object from, Object reuse) {
- StreamRecord<T> fromRecord = (StreamRecord<T>) from;
- StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
-
- reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), 0);
+ public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
+ reuse.replace(typeSerializer.copy(from.getValue(), reuse.getValue()), 0);
return reuse;
}
@Override
- public int getLength() {
- return -1;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void serialize(Object value, DataOutputView target) throws IOException {
- StreamRecord<T> record = (StreamRecord<T>) value;
- typeSerializer.serialize(record.getValue(), target);
+ public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
+ typeSerializer.serialize(value.getValue(), target);
}
@Override
- public Object deserialize(DataInputView source) throws IOException {
+ public StreamRecord<T> deserialize(DataInputView source) throws IOException {
T element = typeSerializer.deserialize(source);
return new StreamRecord<T>(element, 0);
}
@Override
- @SuppressWarnings("unchecked")
- public Object deserialize(Object reuse, DataInputView source) throws IOException {
- StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
- T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
- reuseRecord.replace(element, 0);
+ public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
+ T element = typeSerializer.deserialize(reuse.getValue(), source);
+ reuse.replace(element, 0);
return reuse;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index aa55151..84614bf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -44,28 +44,30 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OutputHandler<OUT> {
+
private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
- private StreamTask<OUT, ?> vertex;
- private StreamConfig configuration;
- private ClassLoader cl;
- private Output<StreamRecord<OUT>> outerOutput;
+ private final StreamTask<OUT, ?> vertex;
+
+ /** The classloader used to access all user code */
+ private final ClassLoader userCodeClassloader;
+
+
+ private final Output<StreamRecord<OUT>> outerOutput;
- public List<StreamOperator<?>> chainedOperators;
+ public final List<StreamOperator<?>> chainedOperators;
- private Map<StreamEdge, RecordWriterOutput<?>> outputMap;
+ private final Map<StreamEdge, RecordWriterOutput<?>> outputMap;
- private Map<Integer, StreamConfig> chainedConfigs;
- private List<StreamEdge> outEdgesInOrder;
+ private final Map<Integer, StreamConfig> chainedConfigs;
- /**
- * Counters for the number of records emitted and bytes written.
- */
- protected AccumulatorRegistry.Reporter reporter;
+ /** Counters for the number of records emitted and bytes written. */
+ protected final AccumulatorRegistry.Reporter reporter;
public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, Accumulator<?,?>> accumulatorMap,
@@ -73,17 +75,17 @@ public class OutputHandler<OUT> {
// Initialize some fields
this.vertex = vertex;
- this.configuration = new StreamConfig(vertex.getTaskConfiguration());
+ StreamConfig configuration = new StreamConfig(vertex.getTaskConfiguration());
this.chainedOperators = new ArrayList<StreamOperator<?>>();
this.outputMap = new HashMap<StreamEdge, RecordWriterOutput<?>>();
- this.cl = vertex.getUserCodeClassLoader();
+ this.userCodeClassloader = vertex.getUserCodeClassLoader();
// We read the chained configs, and the order of record writer
- // registrations by outputname
- this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
+ // registrations by output name
+ this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
this.chainedConfigs.put(configuration.getVertexID(), configuration);
- this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
+ List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
this.reporter = reporter;
@@ -133,25 +135,24 @@ public class OutputHandler<OUT> {
* @return Returns the output for the chain starting from the given
* config
*/
- @SuppressWarnings({"unchecked", "rawtypes"})
+ @SuppressWarnings("unchecked")
private <X> Output<StreamRecord<X>> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
-
-
+
// We create a wrapper that will encapsulate the chained operators and
// network outputs
- OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl);
+ OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(userCodeClassloader);
CollectorWrapper wrapper = new CollectorWrapper(outputSelectorWrapper);
// Create collectors for the network outputs
- for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
+ for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(userCodeClassloader)) {
Output<?> output = outputMap.get(outputEdge);
wrapper.addCollector(output, outputEdge);
}
// Create collectors for the chained outputs
- for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
+ for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(userCodeClassloader)) {
Integer outputId = outputEdge.getTargetId();
Output<?> output = createChainedCollector(chainedConfigs.get(outputId), accumulatorMap);
@@ -163,11 +164,12 @@ public class OutputHandler<OUT> {
// The current task is the first chained task at this vertex so we
// return the wrapper
return (Output<StreamRecord<X>>) wrapper;
- } else {
+ }
+ else {
// The current task is a part of the chain so we get the chainable
// operator which will be returned and set it up using the wrapper
OneInputStreamOperator chainableOperator =
- chainedTaskConfig.getStreamOperator(vertex.getUserCodeClassLoader());
+ chainedTaskConfig.getStreamOperator(userCodeClassloader);
StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap);
vertex.contexts.add(chainedContext);
@@ -177,14 +179,20 @@ public class OutputHandler<OUT> {
chainedOperators.add(chainableOperator);
if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
return new ChainingOutput<X>(chainableOperator);
- } else {
- StreamRecordSerializer serializerIn1;
+ }
+ else {
+ TypeSerializer<X> typeSer = chainedTaskConfig.getTypeSerializerIn1(userCodeClassloader);
+ TypeSerializer<StreamRecord<X>> inSerializer;
+
if (vertex.getExecutionConfig().areTimestampsEnabled()) {
- serializerIn1 = new MultiplexingStreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
- } else {
- serializerIn1 = new StreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
+ inSerializer = (TypeSerializer<StreamRecord<X>>)
+ (TypeSerializer<?>) new MultiplexingStreamRecordSerializer<X>(typeSer);
}
- return new CopyingChainingOutput<X>(chainableOperator, (TypeSerializer<StreamRecord<X>>) serializerIn1);
+ else {
+ inSerializer = new StreamRecordSerializer<X>(typeSer);
+ }
+
+ return new CopyingChainingOutput<X>(chainableOperator, inSerializer);
}
}
@@ -244,14 +252,14 @@ public class OutputHandler<OUT> {
}
private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
- protected OneInputStreamOperator<T, ?> operator;
+
+ protected final OneInputStreamOperator<T, ?> operator;
public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
this.operator = operator;
}
@Override
- @SuppressWarnings("unchecked")
public void collect(StreamRecord<T> record) {
try {
operator.getRuntimeContext().setNextInput(record);
@@ -268,7 +276,8 @@ public class OutputHandler<OUT> {
public void emitWatermark(Watermark mark) {
try {
operator.processWatermark(mark);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Could not forward element to operator: {}", e);
}
@@ -280,10 +289,12 @@ public class OutputHandler<OUT> {
public void close() {
try {
operator.close();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Could not forward close call to operator.", e);
}
+ throw new RuntimeException(e);
}
}
}
@@ -298,12 +309,12 @@ public class OutputHandler<OUT> {
}
@Override
- @SuppressWarnings("unchecked")
public void collect(StreamRecord<T> record) {
try {
operator.getRuntimeContext().setNextInput(record);
operator.processElement(serializer.copy(record));
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Could not forward element to operator.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 283243e..435831f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
@@ -79,9 +78,8 @@ public class StreamTaskTestHarness<OUT> {
private AbstractInvokable task;
- private TypeInformation<OUT> outputType;
private TypeSerializer<OUT> outputSerializer;
- private StreamRecordSerializer<OUT> outputStreamRecordSerializer;
+ private TypeSerializer<Object> outputStreamRecordSerializer;
private ConcurrentLinkedQueue<Object> outputList;
@@ -114,7 +112,6 @@ public class StreamTaskTestHarness<OUT> {
streamConfig.setChainStart();
streamConfig.setBufferTimeout(0);
- this.outputType = outputType;
outputSerializer = outputType.createSerializer(executionConfig);
outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
}
@@ -127,7 +124,7 @@ public class StreamTaskTestHarness<OUT> {
@SuppressWarnings("unchecked")
private void initializeOutput() {
- outputList = new ConcurrentLinkedQueue();
+ outputList = new ConcurrentLinkedQueue<Object>();
mockEnv.addOutput(outputList, outputStreamRecordSerializer);