You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:47 UTC
[08/11] flink git commit: [FLINK-4460] Make chaining work with side
outputs
[FLINK-4460] Make chaining work with side outputs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4069159
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4069159
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4069159
Branch: refs/heads/master
Commit: d4069159c2f4f32fefae1e7491543827632ba8ea
Parents: 5a44a51
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Feb 16 14:09:26 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100
----------------------------------------------------------------------
.../api/graph/StreamingJobGraphGenerator.java | 1 -
.../streaming/runtime/tasks/OperatorChain.java | 118 +++++++++++++++----
2 files changed, 94 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d4069159/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 9cf0432..0896eb7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -488,7 +488,6 @@ public class StreamingJobGraphGenerator {
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
- && edge.getOutputTag() == null // disable chaining for side outputs
&& streamGraph.isChainingEnabled();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d4069159/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index e21393a..499d4a3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -273,7 +273,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
Output<StreamRecord<T>> output = createChainedOperator(
- containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
+ containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators, outputEdge.getOutputTag());
allOutputs.add(new Tuple2<>(output, outputEdge));
}
@@ -327,7 +327,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
- List<StreamOperator<?>> allOperators)
+ List<StreamOperator<?>> allOperators,
+ OutputTag<IN> outputTag)
{
// create the output that the operator writes to first. this may recursively create more operators
Output<StreamRecord<OUT>> output = createOutputCollector(
@@ -335,16 +336,17 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
// now create the operator and give it the output collector to write its output to
OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
+
chainedOperator.setup(containingTask, operatorConfig, output);
allOperators.add(chainedOperator);
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
- return new ChainingOutput<>(chainedOperator, this);
+ return new ChainingOutput<>(chainedOperator, this, outputTag);
}
else {
TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
- return new CopyingChainingOutput<>(chainedOperator, inSerializer, this);
+ return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
}
}
@@ -358,14 +360,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
if (edge.getOutputTag() != null) {
// side output
- outSerializer =
- upStreamConfig.getTypeSerializerSideOut(
- edge.getOutputTag(),
- taskEnvironment.getUserClassLoader());
+ outSerializer = upStreamConfig.getTypeSerializerSideOut(
+ edge.getOutputTag(), taskEnvironment.getUserClassLoader());
} else {
// main output
- outSerializer =
- upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+ outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
}
@SuppressWarnings("unchecked")
@@ -401,18 +400,49 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
protected final StreamStatusProvider streamStatusProvider;
- public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) {
+ protected final OutputTag<T> outputTag;
+
+ public ChainingOutput(
+ OneInputStreamOperator<T, ?> operator,
+ StreamStatusProvider streamStatusProvider,
+ OutputTag<T> outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
this.streamStatusProvider = streamStatusProvider;
+ this.outputTag = outputTag;
}
@Override
public void collect(StreamRecord<T> record) {
+ if (this.outputTag != null) {
+ // we are only responsible for emitting to the main input
+ return;
+ }
+
+ pushToOperator(record);
+ }
+
+ @Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
+ // we are only responsible for emitting to the side-output specified by our
+ // OutputTag.
+ return;
+ }
+
+ pushToOperator(record);
+ }
+
+ protected <X> void pushToOperator(StreamRecord<X> record) {
try {
+ // we know that the given outputTag matches our OutputTag so the record
+ // must be of the type that our operator expects.
+ @SuppressWarnings("unchecked")
+ StreamRecord<T> castRecord = (StreamRecord<T>) record;
+
numRecordsIn.inc();
- operator.setKeyContextElement1(record);
- operator.processElement(record);
+ operator.setKeyContextElement1(castRecord);
+ operator.processElement(castRecord);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
@@ -420,11 +450,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
@Override
- public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
- // ignore
- }
-
- @Override
public void emitWatermark(Watermark mark) {
try {
if (streamStatusProvider.getStreamStatus().isActive()) {
@@ -460,25 +485,53 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
private static final class CopyingChainingOutput<T> extends ChainingOutput<T> {
private final TypeSerializer<T> serializer;
-
+
public CopyingChainingOutput(
OneInputStreamOperator<T, ?> operator,
TypeSerializer<T> serializer,
+ OutputTag<T> outputTag,
StreamStatusProvider streamStatusProvider) {
- super(operator, streamStatusProvider);
+ super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
@Override
public void collect(StreamRecord<T> record) {
+ if (this.outputTag != null) {
+ // we are only responsible for emitting to the main input
+ return;
+ }
+
+ pushToOperator(record);
+ }
+
+ @Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
+ // we are only responsible for emitting to the side-output specified by our
+ // OutputTag.
+ return;
+ }
+
+ pushToOperator(record);
+ }
+
+ @Override
+ protected <X> void pushToOperator(StreamRecord<X> record) {
try {
+ // we know that the given outputTag matches our OutputTag so the record
+ // must be of the type that our operator (and Serializer) expects.
+ @SuppressWarnings("unchecked")
+ StreamRecord<T> castRecord = (StreamRecord<T>) record;
+
numRecordsIn.inc();
- StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
+ StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (Exception e) {
throw new RuntimeException("Could not forward element to next operator", e);
}
+
}
}
@@ -508,7 +561,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
- if(outputs.length <= 0) {
+ if (outputs.length <= 0) {
// ignore
} else if(outputs.length == 1) {
outputs[0].emitLatencyMarker(latencyMarker);
@@ -526,8 +579,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
@Override
- public <X> void collect(
- OutputTag<?> outputTag, StreamRecord<X> record) {
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
for (Output<StreamRecord<T>> output : outputs) {
output.collect(outputTag, record);
}
@@ -565,5 +617,23 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
+
+ @Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ for (int i = 0; i < outputs.length - 1; i++) {
+ Output<StreamRecord<T>> output = outputs[i];
+
+ // due to side outputs, StreamRecords of varying types can pass through the broadcasting
+ // collector so we need to cast
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ StreamRecord<T> shallowCopy = (StreamRecord<T>) record.copy(record.getValue());
+ output.collect(outputTag, shallowCopy);
+ }
+
+ // don't copy for the last output
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ StreamRecord<T> castRecord = (StreamRecord<T>) record;
+ outputs[outputs.length - 1].collect(outputTag, castRecord);
+ }
}
}