You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:35 UTC
[03/10] flink git commit: [FLINK-1594] [streaming] Embedded
StreamEdges
[FLINK-1594] [streaming] Embedded StreamEdges
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8ba72b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8ba72b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8ba72b1
Branch: refs/heads/master
Commit: a8ba72b165dfd2b769783d5612b08559cbf24bf9
Parents: 29a6615
Author: Gábor Hermann <re...@gmail.com>
Authored: Thu Feb 26 16:15:17 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100
----------------------------------------------------------------------
.../apache/flink/streaming/api/StreamEdge.java | 2 +-
.../flink/streaming/api/StreamEdgeList.java | 48 ++++++-
.../apache/flink/streaming/api/StreamGraph.java | 125 ++++++++-----------
.../api/StreamingJobGraphGenerator.java | 20 +--
.../flink/streaming/api/WindowingOptimizer.java | 14 +--
5 files changed, 109 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
index 8743233..479ae93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
@@ -55,7 +55,7 @@ public class StreamEdge {
return selectedNames;
}
- public StreamPartitioner<?> getOutputPartitioner() {
+ public StreamPartitioner<?> getPartitioner() {
return outputPartitioner;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
index 85202ab..d15116b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
@@ -90,23 +90,59 @@ public class StreamEdgeList {
}
}
- public List<StreamEdge> getOutEdges(int i) {
- List<StreamEdge> outEdges = outEdgeLists.get(i);
+ public StreamEdge getEdge(int sourceId, int targetId) {
+ Iterator<StreamEdge> outIterator = outEdgeLists.get(sourceId).iterator();
+ while (outIterator.hasNext()) {
+ StreamEdge edge = outIterator.next();
+
+ if (edge.getTargetVertex() == targetId) {
+ return edge;
+ }
+ }
+
+ throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
+ }
+
+ public List<StreamEdge> getOutEdges(int vertexId) {
+ List<StreamEdge> outEdges = outEdgeLists.get(vertexId);
if (outEdges == null) {
- throw new RuntimeException("No such vertex in stream graph: " + i);
+ throw new RuntimeException("No such vertex in stream graph: " + vertexId);
}
return outEdges;
}
- public List<StreamEdge> getInEdges(int i) {
- List<StreamEdge> inEdges = inEdgeLists.get(i);
+ public List<StreamEdge> getInEdges(int vertexId) {
+ List<StreamEdge> inEdges = inEdgeLists.get(vertexId);
if (inEdges == null) {
- throw new RuntimeException("No such vertex in stream graph: " + i);
+ throw new RuntimeException("No such vertex in stream graph: " + vertexId);
}
return inEdges;
}
+
+ public List<Integer> getOutEdgeIndices(int vertexId) {
+ List<StreamEdge> outEdges = getOutEdges(vertexId);
+ List<Integer> outEdgeIndices = new ArrayList<Integer>();
+
+ for (StreamEdge edge : outEdges) {
+ outEdgeIndices.add(edge.getTargetVertex());
+ }
+
+ return outEdgeIndices;
+ }
+
+ public List<Integer> getInEdgeIndices(int vertexId) {
+ List<StreamEdge> inEdges = getInEdges(vertexId);
+
+ List<Integer> inEdgeIndices = new ArrayList<Integer>();
+
+ for (StreamEdge edge : inEdges) {
+ inEdgeIndices.add(edge.getSourceVertex());
+ }
+
+ return inEdgeIndices;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 9f00c8e..dfe66a5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -66,11 +66,11 @@ public class StreamGraph extends StreamingPlan {
// Graph attributes
private Map<Integer, Integer> operatorParallelisms;
private Map<Integer, Long> bufferTimeouts;
- private Map<Integer, List<Integer>> outEdgeLists;
- private Map<Integer, List<Integer>> outEdgeTypes;
- private Map<Integer, List<List<String>>> selectedNames;
- private Map<Integer, List<Integer>> inEdgeLists;
- private Map<Integer, List<StreamPartitioner<?>>> outputPartitioners;
+
+ private StreamEdgeList edges;
+
+ private Map<Integer, List<OutputSelector<?>>> outputSelectors;
+
private Map<Integer, String> operatorNames;
private Map<Integer, StreamInvokable<?, ?>> invokableObjects;
private Map<Integer, StreamRecordSerializer<?>> typeSerializersIn1;
@@ -78,7 +78,6 @@ public class StreamGraph extends StreamingPlan {
private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut1;
private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut2;
private Map<Integer, Class<? extends AbstractInvokable>> jobVertexClasses;
- private Map<Integer, List<OutputSelector<?>>> outputSelectors;
private Map<Integer, Integer> iterationIds;
private Map<Integer, Integer> iterationIDtoHeadID;
private Map<Integer, Integer> iterationIDtoTailID;
@@ -112,17 +111,9 @@ public class StreamGraph extends StreamingPlan {
operatorParallelisms = new HashMap<Integer, Integer>();
containingMaps.add(operatorParallelisms);
bufferTimeouts = new HashMap<Integer, Long>();
- containingMaps.add(bufferTimeouts);
- outEdgeLists = new HashMap<Integer, List<Integer>>();
- containingMaps.add(outEdgeLists);
- outEdgeTypes = new HashMap<Integer, List<Integer>>();
- containingMaps.add(outEdgeTypes);
- selectedNames = new HashMap<Integer, List<List<String>>>();
- containingMaps.add(selectedNames);
- inEdgeLists = new HashMap<Integer, List<Integer>>();
- containingMaps.add(inEdgeLists);
- outputPartitioners = new HashMap<Integer, List<StreamPartitioner<?>>>();
- containingMaps.add(outputPartitioners);
+
+ edges = new StreamEdgeList();
+
operatorNames = new HashMap<Integer, String>();
containingMaps.add(operatorNames);
invokableObjects = new HashMap<Integer, StreamInvokable<?, ?>>();
@@ -221,9 +212,10 @@ public class StreamGraph extends StreamingPlan {
setSerializersFrom(iterationHead, vertexID);
- setEdge(vertexID, iterationHead,
- outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0,
- new ArrayList<String>());
+ int outpartitionerIndexToCopy = edges.getInEdgeIndices(iterationHead).get(0);
+ StreamPartitioner<?> outputPartitioner = edges.getOutEdges(outpartitionerIndexToCopy).get(0).getPartitioner();
+
+ setEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
iterationTimeouts.put(iterationIDtoHeadID.get(iterationID), waitTime);
@@ -290,7 +282,7 @@ public class StreamGraph extends StreamingPlan {
/**
* Sets vertex parameters in the JobGraph
- *
+ *
* @param vertexID
* Name of the vertex
* @param vertexClass
@@ -307,12 +299,10 @@ public class StreamGraph extends StreamingPlan {
setParallelism(vertexID, parallelism);
invokableObjects.put(vertexID, invokableObject);
operatorNames.put(vertexID, operatorName);
- outEdgeLists.put(vertexID, new ArrayList<Integer>());
- outEdgeTypes.put(vertexID, new ArrayList<Integer>());
- selectedNames.put(vertexID, new ArrayList<List<String>>());
+
+ edges.addVertex(vertexID);
outputSelectors.put(vertexID, new ArrayList<OutputSelector<?>>());
- inEdgeLists.put(vertexID, new ArrayList<Integer>());
- outputPartitioners.put(vertexID, new ArrayList<StreamPartitioner<?>>());
+
iterationTailCount.put(vertexID, 0);
}
@@ -333,40 +323,21 @@ public class StreamGraph extends StreamingPlan {
*/
public void setEdge(Integer upStreamVertexID, Integer downStreamVertexID,
StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {
- outEdgeLists.get(upStreamVertexID).add(downStreamVertexID);
- outEdgeTypes.get(upStreamVertexID).add(typeNumber);
- inEdgeLists.get(downStreamVertexID).add(upStreamVertexID);
- outputPartitioners.get(upStreamVertexID).add(partitionerObject);
- selectedNames.get(upStreamVertexID).add(outputNames);
+
+ StreamEdge edge = new StreamEdge(upStreamVertexID, downStreamVertexID, typeNumber, outputNames, partitionerObject);
+ edges.addEdge(edge);
}
public void removeEdge(Integer upStream, Integer downStream) {
- int inputIndex = getInEdges(downStream).indexOf(upStream);
- inEdgeLists.get(downStream).remove(inputIndex);
-
- int outputIndex = getOutEdges(upStream).indexOf(downStream);
- outEdgeLists.get(upStream).remove(outputIndex);
- outEdgeTypes.get(upStream).remove(outputIndex);
- selectedNames.get(upStream).remove(outputIndex);
- outputPartitioners.get(upStream).remove(outputIndex);
+ edges.removeEdge(upStream, downStream);
}
public void removeVertex(Integer toRemove) {
- List<Integer> outEdges = new ArrayList<Integer>(getOutEdges(toRemove));
- List<Integer> inEdges = new ArrayList<Integer>(getInEdges(toRemove));
-
- for (Integer output : outEdges) {
- removeEdge(toRemove, output);
- }
-
- for (Integer input : inEdges) {
- removeEdge(input, toRemove);
- }
+ edges.removeVertex(toRemove);
for (Map<Integer, ?> map : containingMaps) {
map.remove(toRemove);
}
-
}
private void addTypeSerializers(Integer vertexID, StreamRecordSerializer<?> in1,
@@ -418,11 +389,11 @@ public class StreamGraph extends StreamingPlan {
/**
* Sets a user defined {@link OutputSelector} for the given operator. Used
* for directed emits.
- *
+ *
* @param vertexID
- * Name of the vertex for which the output selector will be set
+ * Name of the vertex for which the output selector will be set
* @param outputSelector
- * The user defined output selector.
+ * The user defined output selector.
*/
public <T> void setOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
outputSelectors.get(vertexID).add(outputSelector);
@@ -470,11 +441,11 @@ public class StreamGraph extends StreamingPlan {
/**
* Sets TypeSerializerWrapper from one vertex to another, used with some
* sinks.
- *
+ *
* @param from
- * from
+ * from
* @param to
- * to
+ * to
*/
public void setSerializersFrom(Integer from, Integer to) {
operatorNames.put(to, operatorNames.get(from));
@@ -495,9 +466,9 @@ public class StreamGraph extends StreamingPlan {
/**
* Gets the assembled {@link JobGraph} and adds a user specified name for
* it.
- *
+ *
* @param jobGraphName
- * name of the jobGraph
+ * name of the jobGraph
*/
public JobGraph getJobGraph(String jobGraphName) {
@@ -526,28 +497,24 @@ public class StreamGraph extends StreamingPlan {
return sources;
}
- public List<Integer> getOutEdges(Integer vertexID) {
- return outEdgeLists.get(vertexID);
+ public StreamEdge getEdge(Integer sourceId, Integer targetId) {
+ return edges.getEdge(sourceId, targetId);
}
- public List<Integer> getInEdges(Integer vertexID) {
- return inEdgeLists.get(vertexID);
+ public List<StreamEdge> getOutEdges(Integer vertexID) {
+ return edges.getOutEdges(vertexID);
}
- public List<Integer> getOutEdgeTypes(Integer vertexID) {
-
- return outEdgeTypes.get(vertexID);
+ public List<StreamEdge> getInEdges(Integer vertexID) {
+ return edges.getInEdges(vertexID);
}
- public StreamPartitioner<?> getOutPartitioner(Integer upStreamVertex, Integer downStreamVertex) {
- return outputPartitioners.get(upStreamVertex).get(
- outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
+ public List<Integer> getOutEdgeIndices(Integer vertexID) {
+ return edges.getOutEdgeIndices(vertexID);
}
- public List<String> getSelectedNames(Integer upStreamVertex, Integer downStreamVertex) {
-
- return selectedNames.get(upStreamVertex).get(
- outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
+ public List<Integer> getInEdgeIndices(Integer vertexID) {
+ return edges.getInEdgeIndices(vertexID);
}
public Collection<Integer> getIterationIDs() {
@@ -668,7 +635,9 @@ public class StreamGraph extends StreamingPlan {
JSONArray inputs = new JSONArray();
node.put(PREDECESSORS, inputs);
- for (int inputID : getInEdges(vertexID)) {
+ for (StreamEdge inEdge : getInEdges(vertexID)) {
+ int inputID = inEdge.getSourceVertex();
+
Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ?
edgeRemapings.get(inputID) : inputID;
decorateEdge(inputs, vertexID, mappedID, inputID);
@@ -678,7 +647,9 @@ public class StreamGraph extends StreamingPlan {
toVisit.remove(vertexID);
} else {
Integer iterationHead = -1;
- for (int operator : getInEdges(vertexID)) {
+ for (StreamEdge inEdge : getInEdges(vertexID)) {
+ int operator = inEdge.getSourceVertex();
+
if (iterationIds.keySet().contains(operator)) {
iterationHead = operator;
}
@@ -718,7 +689,9 @@ public class StreamGraph extends StreamingPlan {
JSONArray inEdges = new JSONArray();
obj.put(PREDECESSORS, inEdges);
- for (int inputID : getInEdges(vertexID)) {
+ for (StreamEdge inEdge : getInEdges(vertexID)) {
+ int inputID = inEdge.getSourceVertex();
+
if (edgeRemapings.keySet().contains(inputID)) {
decorateEdge(inEdges, vertexID, inputID, inputID);
} else if (!iterationIds.containsKey(inputID)) {
@@ -737,7 +710,7 @@ public class StreamGraph extends StreamingPlan {
JSONObject input = new JSONObject();
inputArray.put(input);
input.put(ID, mappedInputID);
- input.put(SHIP_STRATEGY, getOutPartitioner(inputID, vertexID).getStrategy());
+ input.put(SHIP_STRATEGY, edges.getEdge(inputID, vertexID).getPartitioner().getStrategy());
input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index ecb6455..607d041 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -104,11 +104,12 @@ public class StreamingJobGraphGenerator {
List<Integer> chainableOutputs = new ArrayList<Integer>();
List<Integer> nonChainableOutputs = new ArrayList<Integer>();
- for (Integer outName : streamGraph.getOutEdges(current)) {
- if (isChainable(current, outName)) {
- chainableOutputs.add(outName);
+ for (StreamEdge outEdge : streamGraph.getOutEdges(current)) {
+ Integer outID = outEdge.getTargetVertex();
+ if (isChainable(current, outID)) {
+ chainableOutputs.add(outID);
} else {
- nonChainableOutputs.add(outName);
+ nonChainableOutputs.add(outID);
}
}
@@ -230,7 +231,7 @@ public class StreamingJobGraphGenerator {
allOutputs.addAll(nonChainableOutputs);
for (Integer output : allOutputs) {
- config.setSelectedNames(output, streamGraph.getSelectedNames(vertexID, output));
+ config.setSelectedNames(output, streamGraph.getEdge(vertexID, output).getSelectedNames());
}
vertexConfigs.put(vertexID, config);
@@ -251,14 +252,13 @@ public class StreamingJobGraphGenerator {
headVertex.getConfiguration()) : chainedConfigs.get(headOfChain).get(
upStreamvertexID);
- List<Integer> outEdgeIndexList = streamGraph.getOutEdgeTypes(upStreamvertexID);
+// List<Integer> outEdgeIndexList = streamGraph.getOutEdgeTypes(upStreamvertexID);
int numOfInputs = downStreamConfig.getNumberOfInputs();
- downStreamConfig.setInputIndex(numOfInputs++, outEdgeIndexList.get(outputIndex));
+ downStreamConfig.setInputIndex(numOfInputs++, streamGraph.getEdge(upStreamvertexID, downStreamvertexID).getTypeNumber());
downStreamConfig.setNumberOfInputs(numOfInputs);
- StreamPartitioner<?> partitioner = streamGraph.getOutPartitioner(upStreamvertexID,
- downStreamvertexID);
+ StreamPartitioner<?> partitioner = streamGraph.getEdge(upStreamvertexID, downStreamvertexID).getPartitioner();
upStreamConfig.setPartitioner(downStreamvertexID, partitioner);
@@ -284,7 +284,7 @@ public class StreamingJobGraphGenerator {
&& outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable
.getChainingStrategy() == ChainingStrategy.ALWAYS)
- && streamGraph.getOutPartitioner(vertexID, outName).getStrategy() == PartitioningStrategy.FORWARD
+ && streamGraph.getEdge(vertexID, outName).getPartitioner().getStrategy() == PartitioningStrategy.FORWARD
&& streamGraph.getParallelism(vertexID) == streamGraph.getParallelism(outName)
&& streamGraph.chaining;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
index e2cbc4b..3e98bda 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
@@ -54,13 +54,13 @@ public class WindowingOptimizer {
for (Integer flattener : flatteners) {
// Flatteners should have exactly one input
- Integer input = streamGraph.getInEdges(flattener).get(0);
+ Integer input = streamGraph.getInEdges(flattener).get(0).getSourceVertex();
// Check whether the flatten is applied after a merge
if (streamGraph.getInvokable(input) instanceof WindowMerger) {
// Mergers should have exactly one input
- Integer mergeInput = streamGraph.getInEdges(input).get(0);
+ Integer mergeInput = streamGraph.getInEdges(input).get(0).getSourceVertex();
streamGraph.setEdge(mergeInput, flattener, new DistributePartitioner(true), 0,
new ArrayList<String>());
@@ -97,9 +97,9 @@ public class WindowingOptimizer {
boolean inMatching = false;
for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) {
Set<Integer> discretizerInEdges = new HashSet<Integer>(
- streamGraph.getInEdges(discretizer.f0));
+ streamGraph.getInEdgeIndices(discretizer.f0));
Set<Integer> matchingInEdges = new HashSet<Integer>(
- streamGraph.getInEdges(matching.f1.get(0)));
+ streamGraph.getInEdgeIndices(matching.f1.get(0)));
if (discretizer.f1.equals(matching.f0)
&& discretizerInEdges.equals(matchingInEdges)) {
@@ -130,7 +130,7 @@ public class WindowingOptimizer {
private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplace,
Integer replaceWith) {
// Convert to array to create a copy
- List<Integer> outEdges = new ArrayList<Integer>(streamGraph.getOutEdges(toReplace));
+ List<Integer> outEdges = new ArrayList<Integer>(streamGraph.getOutEdgeIndices(toReplace));
int numOutputs = outEdges.size();
@@ -139,11 +139,11 @@ public class WindowingOptimizer {
Integer output = outEdges.get(i);
streamGraph.setEdge(replaceWith, output,
- streamGraph.getOutPartitioner(toReplace, output), 0, new ArrayList<String>());
+ streamGraph.getEdge(toReplace, output).getPartitioner(), 0, new ArrayList<String>());
streamGraph.removeEdge(toReplace, output);
}
- List<Integer> inEdges = new ArrayList<Integer>(streamGraph.getInEdges(toReplace));
+ List<Integer> inEdges = new ArrayList<Integer>(streamGraph.getInEdgeIndices(toReplace));
// Remove inputs
for (Integer input : inEdges) {
streamGraph.removeEdge(input, toReplace);