You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:47 UTC

[08/11] flink git commit: [FLINK-4460] Make chaining work with side outputs

[FLINK-4460] Make chaining work with side outputs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4069159
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4069159
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4069159

Branch: refs/heads/master
Commit: d4069159c2f4f32fefae1e7491543827632ba8ea
Parents: 5a44a51
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Feb 16 14:09:26 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |   1 -
 .../streaming/runtime/tasks/OperatorChain.java  | 118 +++++++++++++++----
 2 files changed, 94 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d4069159/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 9cf0432..0896eb7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -488,7 +488,6 @@ public class StreamingJobGraphGenerator {
 					headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
 				&& (edge.getPartitioner() instanceof ForwardPartitioner)
 				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
-				&& edge.getOutputTag() == null // disable chaining for side outputs
 				&& streamGraph.isChainingEnabled();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d4069159/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index e21393a..499d4a3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -273,7 +273,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
 
 			Output<StreamRecord<T>> output = createChainedOperator(
-					containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
+					containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators, outputEdge.getOutputTag());
 			allOutputs.add(new Tuple2<>(output, outputEdge));
 		}
 		
@@ -327,7 +327,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			Map<Integer, StreamConfig> chainedConfigs,
 			ClassLoader userCodeClassloader,
 			Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
-			List<StreamOperator<?>> allOperators)
+			List<StreamOperator<?>> allOperators,
+			OutputTag<IN> outputTag)
 	{
 		// create the output that the operator writes to first. this may recursively create more operators
 		Output<StreamRecord<OUT>> output = createOutputCollector(
@@ -335,16 +336,17 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
 		// now create the operator and give it the output collector to write its output to
 		OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
+
 		chainedOperator.setup(containingTask, operatorConfig, output);
 
 		allOperators.add(chainedOperator);
 
 		if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-			return new ChainingOutput<>(chainedOperator, this);
+			return new ChainingOutput<>(chainedOperator, this, outputTag);
 		}
 		else {
 			TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-			return new CopyingChainingOutput<>(chainedOperator, inSerializer, this);
+			return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
 		}
 	}
 	
@@ -358,14 +360,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
 		if (edge.getOutputTag() != null) {
 			// side output
-			outSerializer =
-					upStreamConfig.getTypeSerializerSideOut(
-							edge.getOutputTag(),
-							taskEnvironment.getUserClassLoader());
+			outSerializer = upStreamConfig.getTypeSerializerSideOut(
+					edge.getOutputTag(), taskEnvironment.getUserClassLoader());
 		} else {
 			// main output
-			outSerializer =
-					upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+			outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
 		}
 
 		@SuppressWarnings("unchecked")
@@ -401,18 +400,49 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
 		protected final StreamStatusProvider streamStatusProvider;
 
-		public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) {
+		protected final OutputTag<T> outputTag;
+
+		public ChainingOutput(
+				OneInputStreamOperator<T, ?> operator,
+				StreamStatusProvider streamStatusProvider,
+				OutputTag<T> outputTag) {
 			this.operator = operator;
 			this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
 			this.streamStatusProvider = streamStatusProvider;
+			this.outputTag = outputTag;
 		}
 
 		@Override
 		public void collect(StreamRecord<T> record) {
+			if (this.outputTag != null) {
+				// we are only responsible for emitting to the main input
+				return;
+			}
+
+			pushToOperator(record);
+		}
+
+		@Override
+		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+			if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
+				// we are only responsible for emitting to the side-output specified by our
+				// OutputTag.
+				return;
+			}
+
+			pushToOperator(record);
+		}
+
+		protected <X> void pushToOperator(StreamRecord<X> record) {
 			try {
+				// we know that the given outputTag matches our OutputTag so the record
+				// must be of the type that our operator expects.
+				@SuppressWarnings("unchecked")
+				StreamRecord<T> castRecord = (StreamRecord<T>) record;
+
 				numRecordsIn.inc();
-				operator.setKeyContextElement1(record);
-				operator.processElement(record);
+				operator.setKeyContextElement1(castRecord);
+				operator.processElement(castRecord);
 			}
 			catch (Exception e) {
 				throw new ExceptionInChainedOperatorException(e);
@@ -420,11 +450,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 
 		@Override
-		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
-			// ignore
-		}
-
-		@Override
 		public void emitWatermark(Watermark mark) {
 			try {
 				if (streamStatusProvider.getStreamStatus().isActive()) {
@@ -460,25 +485,53 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 	private static final class CopyingChainingOutput<T> extends ChainingOutput<T> {
 		
 		private final TypeSerializer<T> serializer;
-		
+
 		public CopyingChainingOutput(
 				OneInputStreamOperator<T, ?> operator,
 				TypeSerializer<T> serializer,
+				OutputTag<T> outputTag,
 				StreamStatusProvider streamStatusProvider) {
-			super(operator, streamStatusProvider);
+			super(operator, streamStatusProvider, outputTag);
 			this.serializer = serializer;
 		}
 
 		@Override
 		public void collect(StreamRecord<T> record) {
+			if (this.outputTag != null) {
+				// we are only responsible for emitting to the main input
+				return;
+			}
+
+			pushToOperator(record);
+		}
+
+		@Override
+		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+			if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
+				// we are only responsible for emitting to the side-output specified by our
+				// OutputTag.
+				return;
+			}
+
+			pushToOperator(record);
+		}
+
+		@Override
+		protected <X> void pushToOperator(StreamRecord<X> record) {
 			try {
+				// we know that the given outputTag matches our OutputTag so the record
+				// must be of the type that our operator (and Serializer) expects.
+				@SuppressWarnings("unchecked")
+				StreamRecord<T> castRecord = (StreamRecord<T>) record;
+
 				numRecordsIn.inc();
-				StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
+				StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
 				operator.setKeyContextElement1(copy);
 				operator.processElement(copy);
 			} catch (Exception e) {
 				throw new RuntimeException("Could not forward element to next operator", e);
 			}
+
 		}
 	}
 	
@@ -508,7 +561,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
 		@Override
 		public void emitLatencyMarker(LatencyMarker latencyMarker) {
-			if(outputs.length <= 0) {
+			if (outputs.length <= 0) {
 				// ignore
 			} else if(outputs.length == 1) {
 				outputs[0].emitLatencyMarker(latencyMarker);
@@ -526,8 +579,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 
 		@Override
-		public <X> void collect(
-				OutputTag<?> outputTag, StreamRecord<X> record) {
+		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
 			for (Output<StreamRecord<T>> output : outputs) {
 				output.collect(outputTag, record);
 			}
@@ -565,5 +617,23 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			// don't copy for the last output
 			outputs[outputs.length - 1].collect(record);
 		}
+
+		@Override
+		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+			for (int i = 0; i < outputs.length - 1; i++) {
+				Output<StreamRecord<T>> output = outputs[i];
+
+				// due to side outputs, StreamRecords of varying types can pass through the broadcasting
+				// collector so we need to cast
+				@SuppressWarnings({"unchecked", "rawtypes"})
+				StreamRecord<T> shallowCopy = (StreamRecord<T>) record.copy(record.getValue());
+				output.collect(outputTag, shallowCopy);
+			}
+
+			// don't copy for the last output
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			StreamRecord<T> castRecord = (StreamRecord<T>) record;
+			outputs[outputs.length - 1].collect(outputTag, castRecord);
+		}
 	}
 }