You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:34 UTC
[02/10] flink git commit: [FLINK-1594] [streaming] Fixed co-tasks
input handling
[FLINK-1594] [streaming] Fixed co-tasks input handling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3158d1d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3158d1d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3158d1d4
Branch: refs/heads/master
Commit: 3158d1d46d92c166ffd3c7c8eafd899460315016
Parents: a8ba72b
Author: Gábor Hermann <re...@gmail.com>
Authored: Fri Feb 27 11:40:01 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100
----------------------------------------------------------------------
.../flink/streaming/api/StreamConfig.java | 73 ++++++++++++++------
.../apache/flink/streaming/api/StreamEdge.java | 3 +-
.../api/StreamingJobGraphGenerator.java | 43 ++++++------
.../api/streamvertex/CoStreamVertex.java | 7 +-
.../api/streamvertex/OutputHandler.java | 8 ++-
5 files changed, 89 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index ea19a44..5b6de85 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -60,7 +60,11 @@ public class StreamConfig implements Serializable {
private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
private static final String ITERATON_WAIT = "iterationWait";
private static final String OUTPUTS = "outvertexIDs";
+ private static final String NONCHAINED_OUTPUTS = "NONCHAINED_OUTPUTS";
+ private static final String CHAINED_OUTPUT_EDGES = "CHAINED_OUTPUTS";
private static final String EDGES_IN_ORDER = "rwOrder";
+ private static final String OUT_STREAM_EDGES = "out stream edges";
+ private static final String IN_STREAM_EDGES = "out stream edges";
// DEFAULT VALUES
@@ -281,19 +285,60 @@ public class StreamConfig implements Serializable {
return config.getInteger(NUMBER_OF_OUTPUTS, 0);
}
- public void setOutputs(List<Integer> outputvertexIDs) {
- config.setBytes(OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
+ public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
+ config.setBytes(NONCHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
}
@SuppressWarnings("unchecked")
- public List<Integer> getOutputs(ClassLoader cl) {
+ public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
try {
- return (List<Integer>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl);
+ return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
} catch (Exception e) {
throw new RuntimeException("Could not instantiate outputs.");
}
}
+ public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
+ config.setBytes(CHAINED_OUTPUTS,
+ SerializationUtils.serialize((Serializable) chainedOutputs));
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
+ try {
+ return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config,
+ CHAINED_OUTPUTS, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate chained outputs.");
+ }
+ }
+
+ public void setOutEdges(List<StreamEdge> outEdges) {
+ config.setBytes(OUT_STREAM_EDGES, SerializationUtils.serialize((Serializable) outEdges));
+ }
+
+ public List<StreamEdge> getOutEdges(ClassLoader cl) {
+ try {
+ return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+ this.config, OUT_STREAM_EDGES, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate outputs.");
+ }
+ }
+
+ public void setInEdges(List<StreamEdge> inEdges) {
+ config.setBytes(IN_STREAM_EDGES, SerializationUtils.serialize((Serializable) inEdges));
+ }
+
+ public List<StreamEdge> getInEdges(ClassLoader cl) {
+ try {
+ return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+ this.config, IN_STREAM_EDGES, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate inputs.");
+ }
+ }
+
public void setOutEdgesInOrder(List<Tuple2<Integer, Integer>> outEdgeList) {
config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
@@ -329,21 +374,6 @@ public class StreamConfig implements Serializable {
return config.getInteger(INPUT_TYPE + inputNumber, 0);
}
- public void setChainedOutputs(List<Integer> chainedOutputs) {
- config.setBytes(CHAINED_OUTPUTS,
- SerializationUtils.serialize((Serializable) chainedOutputs));
- }
-
- @SuppressWarnings("unchecked")
- public List<Integer> getChainedOutputs(ClassLoader cl) {
- try {
- return (List<Integer>) InstantiationUtil.readObjectFromConfig(this.config,
- CHAINED_OUTPUTS, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate chained outputs.");
- }
- }
-
public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
config.setBytes(CHAINED_TASK_CONFIG,
SerializationUtils.serialize((Serializable) chainedTaskConfigs));
@@ -382,9 +412,10 @@ public class StreamConfig implements Serializable {
builder.append("\nTask name: " + getVertexID());
builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
- builder.append("\nOutput names: " + getOutputs(cl));
+ builder.append("\nOutput names: " + getNonChainedOutputs(cl));
builder.append("\nPartitioning:");
- for (Integer outputname : getOutputs(cl)) {
+ for (StreamEdge output : getNonChainedOutputs(cl)) {
+ int outputname = output.getTargetVertex();
builder.append("\n\t" + outputname + ": " + getPartitioner(cl, outputname));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
index 479ae93..7363e08 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
@@ -17,11 +17,12 @@
package org.apache.flink.streaming.api;
+import java.io.Serializable;
import java.util.List;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
-public class StreamEdge {
+public class StreamEdge implements Serializable {
final private int sourceVertex;
final private int targetVertex;
http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 607d041..3ff64ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -101,25 +101,24 @@ public class StreamingJobGraphGenerator {
if (!builtVertices.contains(startNode)) {
List<Tuple2<Integer, Integer>> transitiveOutEdges = new ArrayList<Tuple2<Integer, Integer>>();
- List<Integer> chainableOutputs = new ArrayList<Integer>();
- List<Integer> nonChainableOutputs = new ArrayList<Integer>();
+ List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
+ List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
for (StreamEdge outEdge : streamGraph.getOutEdges(current)) {
- Integer outID = outEdge.getTargetVertex();
- if (isChainable(current, outID)) {
- chainableOutputs.add(outID);
+ if (isChainable(outEdge)) {
+ chainableOutputs.add(outEdge);
} else {
- nonChainableOutputs.add(outID);
+ nonChainableOutputs.add(outEdge);
}
}
- for (Integer chainable : chainableOutputs) {
- transitiveOutEdges.addAll(createChain(startNode, chainable));
+ for (StreamEdge chainable : chainableOutputs) {
+ transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetVertex()));
}
- for (Integer nonChainable : nonChainableOutputs) {
- transitiveOutEdges.add(new Tuple2<Integer, Integer>(current, nonChainable));
- createChain(nonChainable, nonChainable);
+ for (StreamEdge nonChainable : nonChainableOutputs) {
+ transitiveOutEdges.add(new Tuple2<Integer, Integer>(current, nonChainable.getTargetVertex()));
+ createChain(nonChainable.getTargetVertex(), nonChainable.getTargetVertex());
}
chainedNames.put(current, createChainedName(current, chainableOutputs));
@@ -133,6 +132,8 @@ public class StreamingJobGraphGenerator {
config.setChainStart();
config.setOutEdgesInOrder(transitiveOutEdges);
+ config.setOutEdges(streamGraph.getOutEdges(current));
+ config.setInEdges(streamGraph.getInEdges(current));
for (Tuple2<Integer, Integer> edge : transitiveOutEdges) {
connect(startNode, edge);
@@ -157,12 +158,12 @@ public class StreamingJobGraphGenerator {
}
}
- private String createChainedName(Integer vertexID, List<Integer> chainedOutputs) {
+ private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
String operatorName = streamGraph.getOperatorName(vertexID);
if (chainedOutputs.size() > 1) {
List<String> outputChainedNames = new ArrayList<String>();
- for (Integer chainable : chainedOutputs) {
- outputChainedNames.add(chainedNames.get(chainable));
+ for (StreamEdge chainable : chainedOutputs) {
+ outputChainedNames.add(chainedNames.get(chainable.getTargetVertex()));
}
return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
} else if (chainedOutputs.size() == 1) {
@@ -201,7 +202,7 @@ public class StreamingJobGraphGenerator {
}
private void setVertexConfig(Integer vertexID, StreamConfig config,
- List<Integer> chainableOutputs, List<Integer> nonChainableOutputs) {
+ List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
config.setVertexID(vertexID);
config.setBufferTimeout(streamGraph.getBufferTimeout(vertexID));
@@ -215,7 +216,7 @@ public class StreamingJobGraphGenerator {
config.setOutputSelectors(streamGraph.getOutputSelector(vertexID));
config.setNumberOfOutputs(nonChainableOutputs.size());
- config.setOutputs(nonChainableOutputs);
+ config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
config.setStateMonitoring(streamGraph.isMonitoringEnabled());
@@ -227,11 +228,11 @@ public class StreamingJobGraphGenerator {
config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexID));
}
- List<Integer> allOutputs = new ArrayList<Integer>(chainableOutputs);
+ List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
allOutputs.addAll(nonChainableOutputs);
- for (Integer output : allOutputs) {
- config.setSelectedNames(output, streamGraph.getEdge(vertexID, output).getSelectedNames());
+ for (StreamEdge output : allOutputs) {
+ config.setSelectedNames(output.getTargetVertex(), streamGraph.getEdge(vertexID, output.getTargetVertex()).getSelectedNames());
}
vertexConfigs.put(vertexID, config);
@@ -274,7 +275,9 @@ public class StreamingJobGraphGenerator {
}
}
- private boolean isChainable(Integer vertexID, Integer outName) {
+ private boolean isChainable(StreamEdge edge) {
+ int vertexID = edge.getSourceVertex();
+ int outName = edge.getTargetVertex();
StreamInvokable<?, ?> headInvokable = streamGraph.getInvokable(vertexID);
StreamInvokable<?, ?> outInvokable = streamGraph.getInvokable(outName);
http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index d794a35..b919e0f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.streamvertex;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.streaming.api.StreamEdge;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
@@ -29,6 +31,7 @@ import org.apache.flink.util.MutableObjectIterator;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
@@ -78,8 +81,10 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+ List<StreamEdge> inEdges = configuration.getInEdges(userClassLoader);
+
for (int i = 0; i < numberOfInputs; i++) {
- int inputType = configuration.getInputIndex(i);
+ int inputType = inEdges.get(i).getTypeNumber();
InputGate reader = getEnvironment().getInputGate(i);
switch (inputType) {
case 1:
http://git-wip-us.apache.org/repos/asf/flink/blob/3158d1d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index fd375f6..ca6b34d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.StreamEdge;
import org.apache.flink.streaming.api.collector.CollectorWrapper;
import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper;
import org.apache.flink.streaming.api.collector.StreamOutput;
@@ -117,7 +118,8 @@ public class OutputHandler<OUT> {
chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper<OUT>();
// Create collectors for the network outputs
- for (Integer output : chainedTaskConfig.getOutputs(cl)) {
+ for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
+ Integer output = outputEdge.getTargetVertex();
Collector<?> outCollector = outputMap.get(output);
@@ -130,7 +132,9 @@ public class OutputHandler<OUT> {
}
// Create collectors for the chained outputs
- for (Integer output : chainedTaskConfig.getChainedOutputs(cl)) {
+ for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
+ Integer output = outputEdge.getTargetVertex();
+
Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
if (isDirectEmit) {
((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector,