You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:34 UTC

[02/10] flink git commit: [FLINK-1594] [streaming] Fixed co-tasks input handling

[FLINK-1594] [streaming] Fixed co-tasks input handling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3158d1d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3158d1d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3158d1d4

Branch: refs/heads/master
Commit: 3158d1d46d92c166ffd3c7c8eafd899460315016
Parents: a8ba72b
Author: Gábor Hermann <re...@gmail.com>
Authored: Fri Feb 27 11:40:01 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/StreamConfig.java       | 73 ++++++++++++++------
 .../apache/flink/streaming/api/StreamEdge.java  |  3 +-
 .../api/StreamingJobGraphGenerator.java         | 43 ++++++------
 .../api/streamvertex/CoStreamVertex.java        |  7 +-
 .../api/streamvertex/OutputHandler.java         |  8 ++-
 5 files changed, 89 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index ea19a44..5b6de85 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -60,7 +60,11 @@ public class StreamConfig implements Serializable {
 	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
 	private static final String ITERATON_WAIT = "iterationWait";
 	private static final String OUTPUTS = "outvertexIDs";
+	private static final String NONCHAINED_OUTPUTS = "NONCHAINED_OUTPUTS";
+	private static final String CHAINED_OUTPUT_EDGES = "CHAINED_OUTPUTS";
 	private static final String EDGES_IN_ORDER = "rwOrder";
+	private static final String OUT_STREAM_EDGES = "out stream edges";
+	private static final String IN_STREAM_EDGES = "out stream edges";
 
 	// DEFAULT VALUES
 
@@ -281,19 +285,60 @@ public class StreamConfig implements Serializable {
 		return config.getInteger(NUMBER_OF_OUTPUTS, 0);
 	}
 
-	public void setOutputs(List<Integer> outputvertexIDs) {
-		config.setBytes(OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
+	public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
+		config.setBytes(NONCHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
 	}
 
 	@SuppressWarnings("unchecked")
-	public List<Integer> getOutputs(ClassLoader cl) {
+	public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
 		try {
-			return (List<Integer>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl);
+			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
 		} catch (Exception e) {
 			throw new RuntimeException("Could not instantiate outputs.");
 		}
 	}
 
+	public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
+		config.setBytes(CHAINED_OUTPUTS,
+				SerializationUtils.serialize((Serializable) chainedOutputs));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
+		try {
+			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config,
+					CHAINED_OUTPUTS, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate chained outputs.");
+		}
+	}
+
+	public void setOutEdges(List<StreamEdge> outEdges) {
+		config.setBytes(OUT_STREAM_EDGES, SerializationUtils.serialize((Serializable) outEdges));
+	}
+
+	public List<StreamEdge> getOutEdges(ClassLoader cl) {
+		try {
+			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+					this.config, OUT_STREAM_EDGES, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate outputs.");
+		}
+	}
+
+	public void setInEdges(List<StreamEdge> inEdges) {
+		config.setBytes(IN_STREAM_EDGES, SerializationUtils.serialize((Serializable) inEdges));
+	}
+
+	public List<StreamEdge> getInEdges(ClassLoader cl) {
+		try {
+			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+					this.config, IN_STREAM_EDGES, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate inputs.");
+		}
+	}
+
 	public void setOutEdgesInOrder(List<Tuple2<Integer, Integer>> outEdgeList) {
 
 		config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
@@ -329,21 +374,6 @@ public class StreamConfig implements Serializable {
 		return config.getInteger(INPUT_TYPE + inputNumber, 0);
 	}
 
-	public void setChainedOutputs(List<Integer> chainedOutputs) {
-		config.setBytes(CHAINED_OUTPUTS,
-				SerializationUtils.serialize((Serializable) chainedOutputs));
-	}
-
-	@SuppressWarnings("unchecked")
-	public List<Integer> getChainedOutputs(ClassLoader cl) {
-		try {
-			return (List<Integer>) InstantiationUtil.readObjectFromConfig(this.config,
-					CHAINED_OUTPUTS, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate chained outputs.");
-		}
-	}
-
 	public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
 		config.setBytes(CHAINED_TASK_CONFIG,
 				SerializationUtils.serialize((Serializable) chainedTaskConfigs));
@@ -382,9 +412,10 @@ public class StreamConfig implements Serializable {
 		builder.append("\nTask name: " + getVertexID());
 		builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
 		builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
-		builder.append("\nOutput names: " + getOutputs(cl));
+		builder.append("\nOutput names: " + getNonChainedOutputs(cl));
 		builder.append("\nPartitioning:");
-		for (Integer outputname : getOutputs(cl)) {
+		for (StreamEdge output : getNonChainedOutputs(cl)) {
+			int outputname = output.getTargetVertex();
 			builder.append("\n\t" + outputname + ": " + getPartitioner(cl, outputname));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
index 479ae93..7363e08 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
@@ -17,11 +17,12 @@
 
 package org.apache.flink.streaming.api;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 
-public class StreamEdge {
+public class StreamEdge implements Serializable {
 
 	final private int sourceVertex;
 	final private int targetVertex;

http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 607d041..3ff64ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -101,25 +101,24 @@ public class StreamingJobGraphGenerator {
 		if (!builtVertices.contains(startNode)) {
 
 			List<Tuple2<Integer, Integer>> transitiveOutEdges = new ArrayList<Tuple2<Integer, Integer>>();
-			List<Integer> chainableOutputs = new ArrayList<Integer>();
-			List<Integer> nonChainableOutputs = new ArrayList<Integer>();
+			List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
+			List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
 
 			for (StreamEdge outEdge : streamGraph.getOutEdges(current)) {
-				Integer outID = outEdge.getTargetVertex();
-				if (isChainable(current, outID)) {
-					chainableOutputs.add(outID);
+				if (isChainable(outEdge)) {
+					chainableOutputs.add(outEdge);
 				} else {
-					nonChainableOutputs.add(outID);
+					nonChainableOutputs.add(outEdge);
 				}
 			}
 
-			for (Integer chainable : chainableOutputs) {
-				transitiveOutEdges.addAll(createChain(startNode, chainable));
+			for (StreamEdge chainable : chainableOutputs) {
+				transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetVertex()));
 			}
 
-			for (Integer nonChainable : nonChainableOutputs) {
-				transitiveOutEdges.add(new Tuple2<Integer, Integer>(current, nonChainable));
-				createChain(nonChainable, nonChainable);
+			for (StreamEdge nonChainable : nonChainableOutputs) {
+				transitiveOutEdges.add(new Tuple2<Integer, Integer>(current, nonChainable.getTargetVertex()));
+				createChain(nonChainable.getTargetVertex(), nonChainable.getTargetVertex());
 			}
 
 			chainedNames.put(current, createChainedName(current, chainableOutputs));
@@ -133,6 +132,8 @@ public class StreamingJobGraphGenerator {
 
 				config.setChainStart();
 				config.setOutEdgesInOrder(transitiveOutEdges);
+				config.setOutEdges(streamGraph.getOutEdges(current));
+				config.setInEdges(streamGraph.getInEdges(current));
 
 				for (Tuple2<Integer, Integer> edge : transitiveOutEdges) {
 					connect(startNode, edge);
@@ -157,12 +158,12 @@ public class StreamingJobGraphGenerator {
 		}
 	}
 
-	private String createChainedName(Integer vertexID, List<Integer> chainedOutputs) {
+	private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
 		String operatorName = streamGraph.getOperatorName(vertexID);
 		if (chainedOutputs.size() > 1) {
 			List<String> outputChainedNames = new ArrayList<String>();
-			for (Integer chainable : chainedOutputs) {
-				outputChainedNames.add(chainedNames.get(chainable));
+			for (StreamEdge chainable : chainedOutputs) {
+				outputChainedNames.add(chainedNames.get(chainable.getTargetVertex()));
 			}
 			return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
 		} else if (chainedOutputs.size() == 1) {
@@ -201,7 +202,7 @@ public class StreamingJobGraphGenerator {
 	}
 
 	private void setVertexConfig(Integer vertexID, StreamConfig config,
-			List<Integer> chainableOutputs, List<Integer> nonChainableOutputs) {
+			List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
 
 		config.setVertexID(vertexID);
 		config.setBufferTimeout(streamGraph.getBufferTimeout(vertexID));
@@ -215,7 +216,7 @@ public class StreamingJobGraphGenerator {
 		config.setOutputSelectors(streamGraph.getOutputSelector(vertexID));
 
 		config.setNumberOfOutputs(nonChainableOutputs.size());
-		config.setOutputs(nonChainableOutputs);
+		config.setNonChainedOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
 		config.setStateMonitoring(streamGraph.isMonitoringEnabled());
 
@@ -227,11 +228,11 @@ public class StreamingJobGraphGenerator {
 			config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexID));
 		}
 
-		List<Integer> allOutputs = new ArrayList<Integer>(chainableOutputs);
+		List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
 		allOutputs.addAll(nonChainableOutputs);
 
-		for (Integer output : allOutputs) {
-			config.setSelectedNames(output, streamGraph.getEdge(vertexID, output).getSelectedNames());
+		for (StreamEdge output : allOutputs) {
+			config.setSelectedNames(output.getTargetVertex(), streamGraph.getEdge(vertexID, output.getTargetVertex()).getSelectedNames());
 		}
 
 		vertexConfigs.put(vertexID, config);
@@ -274,7 +275,9 @@ public class StreamingJobGraphGenerator {
 		}
 	}
 
-	private boolean isChainable(Integer vertexID, Integer outName) {
+	private boolean isChainable(StreamEdge edge) {
+		int vertexID = edge.getSourceVertex();
+		int outName = edge.getTargetVertex();
 
 		StreamInvokable<?, ?> headInvokable = streamGraph.getInvokable(vertexID);
 		StreamInvokable<?, ?> outInvokable = streamGraph.getInvokable(outName);

http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index d794a35..b919e0f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.streamvertex;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
@@ -29,6 +31,7 @@ import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 
 public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 
@@ -78,8 +81,10 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
 		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
 
+		List<StreamEdge> inEdges = configuration.getInEdges(userClassLoader);
+
 		for (int i = 0; i < numberOfInputs; i++) {
-			int inputType = configuration.getInputIndex(i);
+			int inputType = inEdges.get(i).getTypeNumber();
 			InputGate reader = getEnvironment().getInputGate(i);
 			switch (inputType) {
 			case 1:

http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index fd375f6..ca6b34d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.StreamEdge;
 import org.apache.flink.streaming.api.collector.CollectorWrapper;
 import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper;
 import org.apache.flink.streaming.api.collector.StreamOutput;
@@ -117,7 +118,8 @@ public class OutputHandler<OUT> {
 				chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper<OUT>();
 
 		// Create collectors for the network outputs
-		for (Integer output : chainedTaskConfig.getOutputs(cl)) {
+		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
+			Integer output = outputEdge.getTargetVertex();
 
 			Collector<?> outCollector = outputMap.get(output);
 
@@ -130,7 +132,9 @@ public class OutputHandler<OUT> {
 		}
 
 		// Create collectors for the chained outputs
-		for (Integer output : chainedTaskConfig.getChainedOutputs(cl)) {
+		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
+			Integer output = outputEdge.getTargetVertex();
+
 			Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
 			if (isDirectEmit) {
 				((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector,