You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/08/19 18:42:13 UTC
[4/6] flink git commit: [FLINK-2398][api-breaking] Introduce
StreamGraphGenerator
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 65736f5..6474ae9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -23,14 +23,15 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -44,11 +45,11 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
@@ -84,11 +85,15 @@ public class StreamGraph extends StreamingPlan {
private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
+ private Set<Integer> sinks;
+ private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
+ private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtuaPartitionNodes;
- private Map<Integer, StreamLoop> streamLoops;
- protected Map<Integer, StreamLoop> vertexIDtoLoop;
protected Map<Integer, String> vertexIDtoBrokerID;
+ protected Map<Integer, Long> vertexIDtoLoopTimeout;
private StateHandleProvider<?> stateHandleProvider;
+ private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
+
private boolean forceCheckpoint = false;
public StreamGraph(StreamExecutionEnvironment environment) {
@@ -104,11 +109,14 @@ public class StreamGraph extends StreamingPlan {
* Remove all registered nodes etc.
*/
public void clear() {
- streamNodes = new HashMap<Integer, StreamNode>();
- streamLoops = new HashMap<Integer, StreamLoop>();
- vertexIDtoLoop = new HashMap<Integer, StreamLoop>();
- vertexIDtoBrokerID = new HashMap<Integer, String>();
- sources = new HashSet<Integer>();
+ streamNodes = Maps.newHashMap();
+ virtualSelectNodes = Maps.newHashMap();
+ virtuaPartitionNodes = Maps.newHashMap();
+ vertexIDtoBrokerID = Maps.newHashMap();
+ vertexIDtoLoopTimeout = Maps.newHashMap();
+ iterationSourceSinkPairs = Sets.newHashSet();
+ sources = Sets.newHashSet();
+ sinks = Sets.newHashSet();
}
protected ExecutionConfig getExecutionConfig() {
@@ -167,7 +175,7 @@ public class StreamGraph extends StreamingPlan {
public boolean isIterative() {
- return !streamLoops.isEmpty();
+ return!vertexIDtoLoopTimeout.isEmpty();
}
public <IN, OUT> void addSource(Integer vertexID, StreamOperator<OUT> operatorObject,
@@ -176,6 +184,12 @@ public class StreamGraph extends StreamingPlan {
sources.add(vertexID);
}
+ public <IN, OUT> void addSink(Integer vertexID, StreamOperator<OUT> operatorObject,
+ TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+ addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
+ sinks.add(vertexID);
+ }
+
public <IN, OUT> void addOperator(Integer vertexID, StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
@@ -212,224 +226,141 @@ public class StreamGraph extends StreamingPlan {
}
}
- public void addIterationHead(Integer iterationHead, Integer iterationID, long timeOut,
- TypeInformation<?> feedbackType) {
- // If there is no loop object created for this iteration create one
- StreamLoop loop = streamLoops.get(iterationID);
- if (loop == null) {
- loop = new StreamLoop(iterationID, timeOut, feedbackType);
- streamLoops.put(iterationID, loop);
- }
-
- loop.addHeadOperator(getStreamNode(iterationHead));
- }
-
- public void addIterationTail(List<DataStream<?>> feedbackStreams, Integer iterationID,
- boolean keepPartitioning) {
+ protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
+ StreamOperator<?> operatorObject, String operatorName) {
- if (!streamLoops.containsKey(iterationID)) {
- throw new RuntimeException("Cannot close iteration without head operator.");
+ if (streamNodes.containsKey(vertexID)) {
+ throw new RuntimeException("Duplicate vertexID " + vertexID);
}
- StreamLoop loop = streamLoops.get(iterationID);
+ StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
+ new ArrayList<OutputSelector<?>>(), vertexClass);
- for (DataStream<?> stream : feedbackStreams) {
- loop.addTailOperator(getStreamNode(stream.getId()), stream.getPartitioner(),
- stream.getSelectedNames());
- }
+ streamNodes.put(vertexID, vertex);
- if (keepPartitioning) {
- loop.applyTailPartitioning();
- }
+ return vertex;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void finalizeLoops() {
-
- // We create each loop separately, the order does not matter as sinks
- // and sources don't interact
- for (StreamLoop loop : streamLoops.values()) {
-
- // We make sure not to re-create the loops if the method is called
- // multiple times
- if (loop.getSourceSinkPairs().isEmpty()) {
-
- List<StreamNode> headOps = loop.getHeads();
- List<StreamNode> tailOps = loop.getTails();
+ /**
+ * Adds a new virtual node that is used to connect a downstream vertex to only the outputs
+ * with the selected names.
+ *
+ * When adding an edge from the virtual node to a downstream node the connection will be made
+ * to the original node, only with the selected names given here.
+ *
+ * @param originalId ID of the node that should be connected to.
+ * @param virtualId ID of the virtual node.
+ * @param selectedNames The selected names.
+ */
+ public void addVirtualSelectNode(Integer originalId, Integer virtualId, List<String> selectedNames) {
- // This means that the iteration was not closed. It should not
- // be
- // allowed.
- if (tailOps.isEmpty()) {
- throw new RuntimeException("Cannot execute job with empty iterations.");
- }
+ if (virtualSelectNodes.containsKey(virtualId)) {
+ throw new IllegalStateException("Already has virtual select node with id " + virtualId);
+ }
- // Check whether we keep the feedback partitioning
- if (loop.keepsPartitioning()) {
- // This is the complicated case as we need to enforce
- // partitioning on the tail -> sink side, which
- // requires strict forward connections at source -> head
-
- // We need one source/sink pair per different head
- // parallelism
- // as we depend on strict forwards connections
- Map<Integer, List<StreamNode>> parallelismToHeads = new HashMap<Integer, List<StreamNode>>();
-
- // Group head operators by parallelism
- for (StreamNode head : headOps) {
- int p = head.getParallelism();
- if (!parallelismToHeads.containsKey(p)) {
- parallelismToHeads.put(p, new ArrayList<StreamNode>());
- }
- parallelismToHeads.get(p).add(head);
- }
-
- // We create the sink/source pair for each parallelism
- // group,
- // tails will forward to all sinks but each head operator
- // will
- // only receive from one source (corresponding to its
- // parallelism)
- int c = 0;
- for (Entry<Integer, List<StreamNode>> headGroup : parallelismToHeads.entrySet()) {
- List<StreamNode> headOpsInGroup = headGroup.getValue();
-
- Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop,
- c);
- StreamNode source = sourceSinkPair.f0;
- StreamNode sink = sourceSinkPair.f1;
-
- // We connect the source to the heads in this group
- // (forward), setting
- // type to 2 in case we have a coIteration (this sets
- // the
- // input as the second input of the co-operator)
- for (StreamNode head : headOpsInGroup) {
- int inputType = loop.isCoIteration() ? 2 : 0;
- addEdge(source.getId(), head.getId(), new RebalancePartitioner(true),
- inputType, new ArrayList<String>());
- }
-
- // We connect all the tails to the sink keeping the
- // partitioner
- for (int i = 0; i < tailOps.size(); i++) {
- StreamNode tail = tailOps.get(i);
- StreamPartitioner<?> partitioner = loop.getTailPartitioners().get(i);
- addEdge(tail.getId(), sink.getId(), partitioner.copy(), 0, loop
- .getTailSelectedNames().get(i));
- }
-
- // We set the sink/source parallelism to the group
- // parallelism
- source.setParallelism(headGroup.getKey());
- sink.setParallelism(source.getParallelism());
-
- // We set the proper serializers for the sink/source
- setSerializersFrom(tailOps.get(0).getId(), sink.getId());
- if (loop.isCoIteration()) {
- source.setSerializerOut(loop.getFeedbackType().createSerializer(executionConfig));
- } else {
- setSerializersFrom(headOpsInGroup.get(0).getId(), source.getId());
- }
-
- c++;
- }
-
- } else {
- // This is the most simple case, we add one iteration
- // sink/source pair with the parallelism of the first tail
- // operator. Tail operators will forward the records and
- // partitioning will be enforced from source -> head
-
- Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop, 0);
- StreamNode source = sourceSinkPair.f0;
- StreamNode sink = sourceSinkPair.f1;
-
- // We get the feedback partitioner from the first input of
- // the
- // first head.
- StreamPartitioner<?> partitioner = headOps.get(0).getInEdges().get(0)
- .getPartitioner();
-
- // Connect the sources to heads using this partitioner
- for (StreamNode head : headOps) {
- addEdge(source.getId(), head.getId(), partitioner.copy(), 0,
- new ArrayList<String>());
- }
-
- // The tails are connected to the sink with forward
- // partitioning
- for (int i = 0; i < tailOps.size(); i++) {
- StreamNode tail = tailOps.get(i);
- addEdge(tail.getId(), sink.getId(), new RebalancePartitioner(true), 0, loop
- .getTailSelectedNames().get(i));
- }
-
- // We set the parallelism to match the first tail op to make
- // the
- // forward more efficient
- sink.setParallelism(tailOps.get(0).getParallelism());
- source.setParallelism(sink.getParallelism());
-
- // We set the proper serializers
- setSerializersFrom(headOps.get(0).getId(), source.getId());
- setSerializersFrom(tailOps.get(0).getId(), sink.getId());
- }
+ virtualSelectNodes.put(virtualId,
+ new Tuple2<Integer, List<String>>(originalId, selectedNames));
+ }
- }
+ /**
+ * Adds a new virtual node that is used to connect a downstream vertex to an input with a certain
+ * partitioning.
+ *
+ * When adding an edge from the virtual node to a downstream node the connection will be made
+ * to the original node, but with the partitioning given here.
+ *
+ * @param originalId ID of the node that should be connected to.
+ * @param virtualId ID of the virtual node.
+ * @param partitioner The partitioner
+ */
+ public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {
+ if (virtuaPartitionNodes.containsKey(virtualId)) {
+ throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
}
+ virtuaPartitionNodes.put(virtualId,
+ new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
}
- private Tuple2<StreamNode, StreamNode> createItSourceAndSink(StreamLoop loop, int c) {
- StreamNode source = addNode(-1 * streamNodes.size(), StreamIterationHead.class, null, null);
- sources.add(source.getId());
-
- StreamNode sink = addNode(-1 * streamNodes.size(), StreamIterationTail.class, null, null);
-
- source.setOperatorName("IterationSource-" + loop.getID() + "_" + c);
- sink.setOperatorName("IterationSink-" + loop.getID() + "_" + c);
- vertexIDtoBrokerID.put(source.getId(), loop.getID() + "_" + c);
- vertexIDtoBrokerID.put(sink.getId(), loop.getID() + "_" + c);
- vertexIDtoLoop.put(source.getId(), loop);
- vertexIDtoLoop.put(sink.getId(), loop);
- loop.addSourceSinkPair(source, sink);
+ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
+ addEdgeInternal(upStreamVertexID,
+ downStreamVertexID,
+ typeNumber,
+ null,
+ Lists.<String>newArrayList());
- return new Tuple2<StreamNode, StreamNode>(source, sink);
}
- protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
- StreamOperator<?> operatorObject, String operatorName) {
+ private void addEdgeInternal(Integer upStreamVertexID,
+ Integer downStreamVertexID,
+ int typeNumber,
+ StreamPartitioner<?> partitioner,
+ List<String> outputNames) {
- StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
- new ArrayList<OutputSelector<?>>(), vertexClass);
- streamNodes.put(vertexID, vertex);
+ if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+ int virtualId = upStreamVertexID;
+ upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
+ if (outputNames.isEmpty()) {
+ // selections that happen downstream override earlier selections
+ outputNames = virtualSelectNodes.get(virtualId).f1;
+ }
+ addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
+ } else if (virtuaPartitionNodes.containsKey(upStreamVertexID)) {
+ int virtualId = upStreamVertexID;
+ upStreamVertexID = virtuaPartitionNodes.get(virtualId).f0;
+ if (partitioner == null) {
+ partitioner = virtuaPartitionNodes.get(virtualId).f1;
+ }
+ addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
+ } else {
+ StreamNode upstreamNode = getStreamNode(upStreamVertexID);
+ StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+ // If no partitioner was specified and the parallelism of upstream and downstream
+ // operator matches use forward partitioning, use rebalance otherwise.
+ if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
+ partitioner = new ForwardPartitioner<Object>();
+ } else if (partitioner == null) {
+ partitioner = new RebalancePartitioner<Object>();
+ }
- return vertex;
- }
+ if (partitioner instanceof ForwardPartitioner) {
+ if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
+ throw new UnsupportedOperationException("Forward partitioning does not allow " +
+ "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
+ ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
+ " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
+ }
+ }
- public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID,
- StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {
+ StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
- StreamEdge edge = new StreamEdge(getStreamNode(upStreamVertexID),
- getStreamNode(downStreamVertexID), typeNumber, outputNames, partitionerObject);
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
+ getStreamNode(edge.getSourceId()).addOutEdge(edge);
+ getStreamNode(edge.getTargetId()).addInEdge(edge);
+ }
}
public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
- getStreamNode(vertexID).addOutputSelector(outputSelector);
+ if (virtuaPartitionNodes.containsKey(vertexID)) {
+ addOutputSelector(virtuaPartitionNodes.get(vertexID).f0, outputSelector);
+ } else if (virtualSelectNodes.containsKey(vertexID)) {
+ addOutputSelector(virtualSelectNodes.get(vertexID).f0, outputSelector);
+ } else {
+ getStreamNode(vertexID).addOutputSelector(outputSelector);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Outputselector set for {}", vertexID);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Outputselector set for {}", vertexID);
+ }
}
}
public void setParallelism(Integer vertexID, int parallelism) {
- getStreamNode(vertexID).setParallelism(parallelism);
+ if (getStreamNode(vertexID) != null) {
+ getStreamNode(vertexID).setParallelism(parallelism);
+ }
}
public void setKey(Integer vertexID, KeySelector<?, ?> key) {
@@ -437,17 +368,19 @@ public class StreamGraph extends StreamingPlan {
}
public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
- getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
+ if (getStreamNode(vertexID) != null) {
+ getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
+ }
}
- private void setSerializers(Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) {
+ public void setSerializers(Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) {
StreamNode vertex = getStreamNode(vertexID);
vertex.setSerializerIn1(in1);
vertex.setSerializerIn2(in2);
vertex.setSerializerOut(out);
}
- private void setSerializersFrom(Integer from, Integer to) {
+ public void setSerializersFrom(Integer from, Integer to) {
StreamNode fromVertex = getStreamNode(from);
StreamNode toVertex = getStreamNode(to);
@@ -469,6 +402,10 @@ public class StreamGraph extends StreamingPlan {
public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
StreamNode node = getStreamNode(vertexID);
+ if (node == null) {
+ return;
+ }
+
switch (strategy) {
case ISOLATE:
node.isolateSlot();
@@ -506,6 +443,11 @@ public class StreamGraph extends StreamingPlan {
return sources;
}
+
+ public Collection<Integer> getSinkIDs() {
+ return sinks;
+ }
+
public Collection<StreamNode> getStreamNodes() {
return streamNodes.values();
}
@@ -519,20 +461,44 @@ public class StreamGraph extends StreamingPlan {
return operatorSet;
}
- public Collection<StreamLoop> getStreamLoops() {
- return streamLoops.values();
+ public String getBrokerID(Integer vertexID) {
+ return vertexIDtoBrokerID.get(vertexID);
}
- public Integer getLoopID(Integer vertexID) {
- return vertexIDtoLoop.get(vertexID).getID();
+ public long getLoopTimeout(Integer vertexID) {
+ return vertexIDtoLoopTimeout.get(vertexID);
}
- public String getBrokerID(Integer vertexID) {
- return vertexIDtoBrokerID.get(vertexID);
+ public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
+
+ StreamNode source = this.addNode(sourceId,
+ StreamIterationHead.class,
+ null,
+ null);
+ sources.add(source.getId());
+ setParallelism(source.getId(), parallelism);
+
+ StreamNode sink = this.addNode(sinkId,
+ StreamIterationTail.class,
+ null,
+ null);
+ sinks.add(sink.getId());
+ setParallelism(sink.getId(), parallelism);
+
+ iterationSourceSinkPairs.add(new Tuple2<StreamNode, StreamNode>(source, sink));
+
+ source.setOperatorName("IterationSource-" + loopId);
+ sink.setOperatorName("IterationSink-" + loopId);
+ this.vertexIDtoBrokerID.put(source.getId(), "broker-" + loopId);
+ this.vertexIDtoBrokerID.put(sink.getId(), "broker-" + loopId);
+ this.vertexIDtoLoopTimeout.put(source.getId(), timeout);
+ this.vertexIDtoLoopTimeout.put(sink.getId(), timeout);
+
+ return new Tuple2<StreamNode, StreamNode>(source, sink);
}
- public long getLoopTimeout(Integer vertexID) {
- return vertexIDtoLoop.get(vertexID).getTimeout();
+ public Set<Tuple2<StreamNode, StreamNode>> getIterationSourceSinkPairs() {
+ return iterationSourceSinkPairs;
}
protected void removeEdge(StreamEdge edge) {
@@ -570,7 +536,6 @@ public class StreamGraph extends StreamingPlan {
* name of the jobGraph
*/
public JobGraph getJobGraph(String jobGraphName) {
- finalizeLoops();
// temporarily forbid checkpointing for iterative jobs
if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
throw new UnsupportedOperationException(
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
new file mode 100644
index 0000000..6df8cb5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
+import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.SelectTransformation;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.SplitTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A generator that generates a {@link StreamGraph} from a graph of
+ * {@link StreamTransformation StreamTransformations}.
+ *
+ * <p>
+ * This traverses the tree of {@code StreamTransformations} starting from the sinks. At each
+ * we transformation recursively transform the inputs, then create a node in the {@code StreamGraph}
+ * and add edges from the input Nodes to our newly created node. The transformation methods
+ * return the IDs of the nodes in the StreamGraph that represent the input transformation. Several
+ * IDs can be returned to be able to deal with feedback transformations and unions.
+ *
+ * <p>
+ * Partitioning, split/select and union don't create actual nodes in the {@code StreamGraph}. For
+ * these, we create a virtual node in the {@code StreamGraph} that holds the specific property, i.e.
+ * partitioning, selector and so on. When an edge is created from a virtual node to a downstream
+ * node the {@code StreamGraph} resolved the id of the original node and creates an edge
+ * in the graph with the desired property. For example, if you have this graph:
+ *
+ * <pre>
+ * Map-1 -> HashPartition-2 -> Map-3
+ * </pre>
+ *
+ * where the numbers represent transformation IDs. We first recurse all the way down. {@code Map-1}
+ * is transformed, i.e. we create a {@code StreamNode} with ID 1. Then we transform the
+ * {@code HashPartition}, for this, we create virtual node of ID 4 that holds the property
+ * {@code HashPartition}. This transformation returns the ID 4. Then we transform the {@code Map-3}.
+ * We add the edge {@code 4 -> 3}. The {@code StreamGraph} resolved the actual node with ID 1 and
+ * creates and edge {@code 1 -> 3} with the property HashPartition.
+ */
+public class StreamGraphGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
+
+ // The StreamGraph that is being built, this is initialized at the beginning.
+ private StreamGraph streamGraph;
+
+ private final StreamExecutionEnvironment env;
+
+ // This is used to assign a unique ID to iteration source/sink
+ protected static Integer iterationIdCounter = 0;
+ public static int getNewIterationNodeId() {
+ iterationIdCounter--;
+ return iterationIdCounter;
+ }
+
+ // Keep track of which Transforms we have already transformed, this is necessary because
+ // we have loops, i.e. feedback edges.
+ private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;
+
+
+ /**
+ * Private constructor. The generator should only be invoked using {@link #generate}.
+ */
+ private StreamGraphGenerator(StreamExecutionEnvironment env) {
+ this.streamGraph = new StreamGraph(env);
+ this.streamGraph.setChaining(env.isChainingEnabled());
+ if (env.getCheckpointInterval() > 0) {
+ this.streamGraph.setCheckpointingEnabled(true);
+ this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval());
+ this.streamGraph.setCheckpointingMode(env.getCheckpointingMode());
+ }
+ this.streamGraph.setStateHandleProvider(env.getStateHandleProvider());
+ if (env.isForceCheckpointing()) {
+ this.streamGraph.forceCheckpoint();
+ }
+ this.env = env;
+ this.alreadyTransformed = Maps.newHashMap();
+ }
+
+ /**
+ * Generates a {@code StreamGraph} by traversing the graph of {@code StreamTransformations}
+ * starting from the given transformations.
+ *
+ * @param env The {@code StreamExecutionEnvironment} that is used to set some parameters of the
+ * job
+ * @param transformations The transformations starting from which to transform the graph
+ *
+ * @return The generated {@code StreamGraph}
+ */
+ public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
+ return new StreamGraphGenerator(env).generateInternal(transformations);
+ }
+
+ /**
+ * This starts the actual transformation, beginning from the sinks.
+ */
+ private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
+ for (StreamTransformation<?> transformation: transformations) {
+ transform(transformation);
+ }
+ return streamGraph;
+ }
+
+ /**
+ * Transforms one {@code StreamTransformation}.
+ *
+ * <p>
+ * This checks whether we already transformed it and exits early in that case. If not it
+ * delegates to one of the transformation specific methods.
+ */
+ private Collection<Integer> transform(StreamTransformation<?> transform) {
+
+ if (alreadyTransformed.containsKey(transform)) {
+ return alreadyTransformed.get(transform);
+ }
+
+ LOG.debug("Transforming " + transform);
+
+ // call at least once to trigger exceptions about MissingTypeInfo
+ transform.getOutputType();
+
+ Collection<Integer> transformedIds;
+ if (transform instanceof OneInputTransformation<?, ?>) {
+ transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
+ } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
+ transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
+ } else if (transform instanceof SourceTransformation<?>) {
+ transformedIds = transformSource((SourceTransformation<?>) transform);
+ } else if (transform instanceof SinkTransformation<?>) {
+ transformedIds = transformSink((SinkTransformation<?>) transform);
+ } else if (transform instanceof UnionTransformation<?>) {
+ transformedIds = transformUnion((UnionTransformation<?>) transform);
+ } else if (transform instanceof SplitTransformation<?>) {
+ transformedIds = transformSplit((SplitTransformation<?>) transform);
+ } else if (transform instanceof SelectTransformation<?>) {
+ transformedIds = transformSelect((SelectTransformation<?>) transform);
+ } else if (transform instanceof FeedbackTransformation<?>) {
+ transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
+ } else if (transform instanceof CoFeedbackTransformation<?>) {
+ transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
+ } else if (transform instanceof PartitionTransformation<?>) {
+ transformedIds = transformPartition((PartitionTransformation<?>) transform);
+ } else {
+ throw new IllegalStateException("Unknown transformation: " + transform);
+ }
+
+ // need this check because the iterate transformation adds itself before
+ // transforming the feedback edges
+ if (!alreadyTransformed.containsKey(transform)) {
+ alreadyTransformed.put(transform, transformedIds);
+ }
+
+ if (transform.getBufferTimeout() > 0) {
+ streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
+ }
+ if (transform.getResourceStrategy() != StreamGraph.ResourceStrategy.DEFAULT) {
+ streamGraph.setResourceStrategy(transform.getId(), transform.getResourceStrategy());
+ }
+
+ return transformedIds;
+ }
+
+ /**
+ * Transforms a {@code UnionTransformation}.
+ *
+ * <p>
+ * This is easy, we only have to transform the inputs and return all the IDs in a list so
+ * that downstream operations can connect to all upstream nodes.
+ */
+ private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {
+ List<StreamTransformation<T>> inputs = union.getInputs();
+ List<Integer> resultIds = Lists.newArrayList();
+
+ for (StreamTransformation<T> input: inputs) {
+ resultIds.addAll(transform(input));
+ }
+
+ return resultIds;
+ }
+
+ /**
+ * Transforms a {@code PartitionTransformation}.
+ *
+ * <p>
+ * For this we create a virtual node in the {@code StreamGraph} that holds the partition
+ * property. @see StreamGraphGenerator
+ */
+ private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
+ StreamTransformation<T> input = partition.getInput();
+ List<Integer> resultIds = Lists.newArrayList();
+
+ Collection<Integer> transformedIds = transform(input);
+ for (Integer transformedId: transformedIds) {
+ int virtualId = StreamTransformation.getNewNodeId();
+ streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
+ resultIds.add(virtualId);
+ }
+
+ return resultIds;
+ }
+
+ /**
+ * Transforms a {@code SplitTransformation}.
+ *
+ * <p>
+ * We add the output selector to previously transformed nodes.
+ */
+ private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
+
+ StreamTransformation<T> input = split.getInput();
+ Collection<Integer> resultIds = transform(input);
+
+ // the recursive transform call might have transformed this already
+ if (alreadyTransformed.containsKey(split)) {
+ return alreadyTransformed.get(split);
+ }
+
+ for (int inputId : resultIds) {
+ streamGraph.addOutputSelector(inputId, split.getOutputSelector());
+ }
+
+
+ return resultIds;
+ }
+
+ /**
+ * Transforms a {@code SelectTransformation}.
+ *
+ * <p>
+ * For this we create a virtual node in the {@code StreamGraph} holds the selected names.
+ * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
+ */
+ private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
+ StreamTransformation<T> input = select.getInput();
+ Collection<Integer> resultIds = transform(input);
+
+
+ // the recursive transform might have already transformed this
+ if (alreadyTransformed.containsKey(select)) {
+ return alreadyTransformed.get(select);
+ }
+
+ List<Integer> virtualResultIds = Lists.newArrayList();
+
+ for (int inputId : resultIds) {
+ int virtualId = StreamTransformation.getNewNodeId();
+ streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
+ virtualResultIds.add(virtualId);
+ }
+ return virtualResultIds;
+ }
+
+ /**
+ * Transforms a {@code FeedbackTransformation}.
+ *
+ * <p>
+ * This will recursively transform the input and the feedback edges. We return the concatenation
+ * of the input IDs and the feedback IDs so that downstream operations can be wired to both.
+ *
+ * <p>
+ * This is responsible for creating the IterationSource and IterationSink which
+ * are used to feed back the elements.
+ */
+ private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
+
+ if (iterate.getFeedbackEdges().size() <= 0) {
+ throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
+ }
+
+ StreamTransformation<T> input = iterate.getInput();
+ List<Integer> resultIds = Lists.newArrayList();
+
+ // first transform the input stream(s) and store the result IDs
+ resultIds.addAll(transform(input));
+
+ // the recursive transform might have already transformed this
+ if (alreadyTransformed.containsKey(iterate)) {
+ return alreadyTransformed.get(iterate);
+ }
+
+
+ // create the fake iteration source/sink pair
+ Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
+ iterate.getId(),
+ getNewIterationNodeId(),
+ getNewIterationNodeId(),
+ iterate.getWaitTime(),
+ iterate.getParallelism());
+
+ StreamNode itSource = itSourceAndSink.f0;
+ StreamNode itSink = itSourceAndSink.f1;
+
+ // We set the proper serializers for the sink/source
+ streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(env.getConfig()));
+ streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(env.getConfig()), null, null);
+
+ // also add the feedback source ID to the result IDs, so that downstream operators will
+ // add both as input
+ resultIds.add(itSource.getId());
+
+ // at the iterate to the already-seen-set with the result IDs, so that we can transform
+ // the feedback edges and let them stop when encountering the iterate node
+ alreadyTransformed.put(iterate, resultIds);
+
+ for (StreamTransformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
+ Collection<Integer> feedbackIds = transform(feedbackEdge);
+ for (Integer feedbackId: feedbackIds) {
+ streamGraph.addEdge(feedbackId,
+ itSink.getId(),
+ 0
+ );
+ }
+ }
+
+ return resultIds;
+ }
+
+ /**
+ * Transforms a {@code CoFeedbackTransformation}.
+ *
+ * <p>
+ * This will only transform feedback edges, the result of this transform will be wired
+ * to the second input of a Co-Transform. The original input is wired directly to the first
+ * input of the downstream Co-Transform.
+ *
+ * <p>
+ * This is responsible for creating the IterationSource and IterationSink which
+ * are used to feed back the elements.
+ */
+ private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {
+
+ // For Co-Iteration we don't need to transform the input and wire the input to the
+ // head operator by returning the input IDs, the input is directly wired to the left
+ // input of the co-operation. This transform only needs to return the ids of the feedback
+ // edges, since they need to be wired to the second input of the co-operation.
+
+ // create the fake iteration source/sink pair
+ Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
+ coIterate.getId(),
+ getNewIterationNodeId(),
+ getNewIterationNodeId(),
+ coIterate.getWaitTime(),
+ coIterate.getParallelism());
+
+ StreamNode itSource = itSourceAndSink.f0;
+ StreamNode itSink = itSourceAndSink.f1;
+
+ // We set the proper serializers for the sink/source
+ streamGraph.setSerializers(itSource.getId(), null, null, coIterate.getOutputType().createSerializer(env.getConfig()));
+ streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(env.getConfig()), null, null);
+
+ Collection<Integer> resultIds = Collections.singleton(itSource.getId());
+
+ // at the iterate to the already-seen-set with the result IDs, so that we can transform
+ // the feedback edges and let them stop when encountering the iterate node
+ alreadyTransformed.put(coIterate, resultIds);
+
+ for (StreamTransformation<F> feedbackEdge : coIterate.getFeedbackEdges()) {
+ Collection<Integer> feedbackIds = transform(feedbackEdge);
+ for (Integer feedbackId: feedbackIds) {
+ streamGraph.addEdge(feedbackId,
+ itSink.getId(),
+ 0
+ );
+ }
+ }
+
+ return Collections.singleton(itSource.getId());
+ }
+
+ /**
+ * Transforms a {@code SourceTransformation}.
+ */
+ private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
+ streamGraph.addSource(source.getId(),
+ source.getOperator(),
+ null,
+ source.getOutputType(),
+ "Source: " + source.getName());
+ if (source.getOperator().getUserFunction() instanceof FileSourceFunction) {
+ FileSourceFunction<T> fs = (FileSourceFunction<T>) source.getOperator().getUserFunction();
+ streamGraph.setInputFormat(source.getId(), fs.getFormat());
+ }
+ streamGraph.setParallelism(source.getId(), source.getParallelism());
+ return Collections.singleton(source.getId());
+ }
+
+ /**
+ * Transforms a {@code SourceTransformation}.
+ */
+ private <T> Collection<Integer> transformSink(SinkTransformation<T> sink) {
+
+ Collection<Integer> inputIds = transform(sink.getInput());
+
+ streamGraph.addSink(sink.getId(),
+ sink.getOperator(),
+ sink.getInput().getOutputType(),
+ null,
+ "Sink: " + sink.getName());
+
+ streamGraph.setParallelism(sink.getId(), sink.getParallelism());
+
+ for (Integer inputId: inputIds) {
+ streamGraph.addEdge(inputId,
+ sink.getId(),
+ 0
+ );
+ }
+
+
+ if (sink.getStateKeySelector() != null) {
+ streamGraph.setKey(sink.getId(), sink.getStateKeySelector());
+ }
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * Transforms a {@code OneInputTransformation}.
+ *
+ * <p>
+ * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
+ * wired the inputs to this new node.
+ */
+ private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
+
+ Collection<Integer> inputIds = transform(transform.getInput());
+
+ // the recursive call might have already transformed this
+ if (alreadyTransformed.containsKey(transform)) {
+ return alreadyTransformed.get(transform);
+ }
+
+ streamGraph.addOperator(transform.getId(),
+ transform.getOperator(),
+ transform.getInputType(),
+ transform.getOutputType(),
+ transform.getName());
+
+ if (transform.getStateKeySelector() != null) {
+ streamGraph.setKey(transform.getId(), transform.getStateKeySelector());
+ }
+
+ streamGraph.setParallelism(transform.getId(), transform.getParallelism());
+
+ for (Integer inputId: inputIds) {
+ streamGraph.addEdge(inputId, transform.getId(), 0);
+ }
+
+ return Collections.singleton(transform.getId());
+ }
+
+ /**
+ * Transforms a {@code TwoInputTransformation}.
+ *
+ * <p>
+ * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
+ * wired the inputs to this new node.
+ */
+ private <IN1, IN2, OUT> Collection<Integer> transformTwoInputTransform(TwoInputTransformation<IN1, IN2, OUT> transform) {
+
+ Collection<Integer> inputIds1 = transform(transform.getInput1());
+ Collection<Integer> inputIds2 = transform(transform.getInput2());
+
+ // the recursive call might have already transformed this
+ if (alreadyTransformed.containsKey(transform)) {
+ return alreadyTransformed.get(transform);
+ }
+
+ streamGraph.addCoOperator(
+ transform.getId(),
+ transform.getOperator(),
+ transform.getInputType1(),
+ transform.getInputType2(),
+ transform.getOutputType(),
+ transform.getName());
+
+ streamGraph.setParallelism(transform.getId(), transform.getParallelism());
+
+ for (Integer inputId: inputIds1) {
+ streamGraph.addEdge(inputId,
+ transform.getId(),
+ 1
+ );
+ }
+
+ for (Integer inputId: inputIds2) {
+ streamGraph.addEdge(inputId,
+ transform.getId(),
+ 2
+ );
+ }
+
+ return Collections.singleton(transform.getId());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
deleted file mode 100644
index ba987ef..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * Object for representing loops in streaming programs.
- *
- */
-public class StreamLoop {
-
- private Integer loopID;
-
- private List<StreamNode> headOperators = new ArrayList<StreamNode>();
- private List<StreamNode> tailOperators = new ArrayList<StreamNode>();
- private List<StreamPartitioner<?>> tailPartitioners = new ArrayList<StreamPartitioner<?>>();
- private List<List<String>> tailSelectedNames = new ArrayList<List<String>>();
-
- private boolean coIteration = false;
- private TypeInformation<?> feedbackType = null;
-
- private long timeout;
- private boolean tailPartitioning = false;
-
- private List<Tuple2<StreamNode, StreamNode>> sourcesAndSinks = new ArrayList<Tuple2<StreamNode, StreamNode>>();
-
- public StreamLoop(Integer loopID, long timeout, TypeInformation<?> feedbackType) {
- this.loopID = loopID;
- this.timeout = timeout;
- if (feedbackType != null) {
- this.feedbackType = feedbackType;
- coIteration = true;
- tailPartitioning = true;
- }
- }
-
- public Integer getID() {
- return loopID;
- }
-
- public long getTimeout() {
- return timeout;
- }
-
- public boolean isCoIteration() {
- return coIteration;
- }
-
- public TypeInformation<?> getFeedbackType() {
- return feedbackType;
- }
-
- public void addSourceSinkPair(StreamNode source, StreamNode sink) {
- this.sourcesAndSinks.add(new Tuple2<StreamNode, StreamNode>(source, sink));
- }
-
- public List<Tuple2<StreamNode, StreamNode>> getSourceSinkPairs() {
- return this.sourcesAndSinks;
- }
-
- public void addHeadOperator(StreamNode head) {
- this.headOperators.add(head);
- }
-
- public void addTailOperator(StreamNode tail, StreamPartitioner<?> partitioner,
- List<String> selectedNames) {
- this.tailOperators.add(tail);
- this.tailPartitioners.add(partitioner);
- this.tailSelectedNames.add(selectedNames);
- }
-
- public void applyTailPartitioning() {
- this.tailPartitioning = true;
- }
-
- public boolean keepsPartitioning() {
- return tailPartitioning;
- }
-
- public List<StreamNode> getHeads() {
- return headOperators;
- }
-
- public List<StreamNode> getTails() {
- return tailOperators;
- }
-
- public List<StreamPartitioner<?>> getTailPartitioners() {
- return tailPartitioners;
- }
-
- public List<List<String>> getTailSelectedNames() {
- return tailSelectedNames;
- }
-
- @Override
- public String toString() {
- return "ID: " + loopID + "\n" + "Head: " + headOperators + "\n" + "Tail: " + tailOperators
- + "\n" + "TP: " + tailPartitioners + "\n" + "TSN: " + tailSelectedNames;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 62e2d83..9110cd3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -125,7 +125,11 @@ public class StreamNode implements Serializable {
}
public int getParallelism() {
- return parallelism != null ? parallelism : env.getParallelism();
+ if (parallelism == -1) {
+ return env.getParallelism();
+ } else {
+ return parallelism;
+ }
}
public void setParallelism(Integer parallelism) {
@@ -218,7 +222,7 @@ public class StreamNode implements Serializable {
@Override
public String toString() {
- return operatorName + id;
+ return operatorName + "-" + id;
}
public KeySelector<?, ?> getStatePartitioner() {
@@ -228,4 +232,23 @@ public class StreamNode implements Serializable {
public void setStatePartitioner(KeySelector<?, ?> statePartitioner) {
this.statePartitioner = statePartitioner;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ StreamNode that = (StreamNode) o;
+
+ return id.equals(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5280fb2..314d1b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -44,8 +44,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.PartitioningStrategy;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.InstantiationUtil;
@@ -294,11 +294,6 @@ public class StreamingJobGraphGenerator {
List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
allOutputs.addAll(nonChainableOutputs);
- for (StreamEdge output : allOutputs) {
- config.setSelectedNames(output.getTargetId(),
- streamGraph.getStreamEdge(vertexID, output.getTargetId()).getSelectedNames());
- }
-
vertexConfigs.put(vertexID, config);
}
@@ -316,7 +311,7 @@ public class StreamingJobGraphGenerator {
downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
StreamPartitioner<?> partitioner = edge.getPartitioner();
- if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) {
+ if (partitioner instanceof ForwardPartitioner) {
downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
} else {
downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL);
@@ -345,7 +340,7 @@ public class StreamingJobGraphGenerator {
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
headOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS)
- && (edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD || downStreamVertex
+ && (edge.getPartitioner() instanceof ForwardPartitioner || downStreamVertex
.getParallelism() == 1)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& (streamGraph.isChainingEnabled() ||
@@ -370,21 +365,19 @@ public class StreamingJobGraphGenerator {
}
}
- for (StreamLoop loop : streamGraph.getStreamLoops()) {
- for (Tuple2<StreamNode, StreamNode> pair : loop.getSourceSinkPairs()) {
-
- CoLocationGroup ccg = new CoLocationGroup();
-
- JobVertex source = jobVertices.get(pair.f0.getId());
- JobVertex sink = jobVertices.get(pair.f1.getId());
-
- ccg.addVertex(source);
- ccg.addVertex(sink);
- source.updateCoLocationGroup(ccg);
- sink.updateCoLocationGroup(ccg);
- }
+ for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) {
+ CoLocationGroup ccg = new CoLocationGroup();
+
+ JobVertex source = jobVertices.get(pair.f0.getId());
+ JobVertex sink = jobVertices.get(pair.f1.getId());
+
+ ccg.addVertex(source);
+ ccg.addVertex(sink);
+ source.updateCoLocationGroup(ccg);
+ sink.updateCoLocationGroup(ccg);
}
+
}
private void configureCheckpointing() {
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
index dce7684..cbd2a40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
public class WindowingOptimizer {
@@ -64,8 +63,7 @@ public class WindowingOptimizer {
StreamNode mergeInput = input.getInEdges().get(0).getSourceVertex();
// We connect the merge input to the flattener directly
- streamGraph.addEdge(mergeInput.getId(), flattenerId,
- new RebalancePartitioner(true), 0, new ArrayList<String>());
+ streamGraph.addEdge(mergeInput.getId(), flattenerId, 0);
// If the merger is only connected to the flattener we delete it
// completely, otherwise we only remove the edge
@@ -107,8 +105,7 @@ public class WindowingOptimizer {
for (StreamEdge edge1 : discretizer.getInEdges()) {
for (StreamEdge edge2 : candidate.f1.get(0).getInEdges()) {
- if (edge1.getPartitioner().getStrategy() != edge2.getPartitioner()
- .getStrategy()) {
+ if (edge1.getPartitioner().getClass() != edge2.getPartitioner().getClass()) {
partitionersMatch = false;
}
}
@@ -155,8 +152,7 @@ public class WindowingOptimizer {
for (int i = 0; i < numOutputs; i++) {
StreamEdge outEdge = outEdges.get(i);
- streamGraph.addEdge(replaceWithId, outEdge.getTargetId(), outEdge.getPartitioner(), 0,
- new ArrayList<String>());
+ streamGraph.addEdge(replaceWithId, outEdge.getTargetId(), 0);
}
// Remove the other discretizer
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
new file mode 100644
index 0000000..67ccbd6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This represents a feedback point in a topology. The type of the feedback elements must not match
+ * the type of the upstream {@code StreamTransformation} because the only allowed operations
+ * after a {@code CoFeedbackTransformation} are
+ * {@link org.apache.flink.streaming.api.transformations.TwoInputTransformation TwoInputTransformations}.
+ * The upstream {@code StreamTransformation} will be connected to the first input of the Co-Transform
+ * while the feedback edges will be connected to the second input.
+ *
+ * <p>
+ * Both the partitioning of the input and the feedback edges is preserved. They can also have
+ * differing partitioning strategies. This requires, however, that the parallelism of the feedback
+ * {@code StreamTransformations} must match the parallelism of the input
+ * {@code StreamTransformation}.
+ *
+ * <p>
+ * The upstream {@code StreamTransformation} is not wired to this {@code CoFeedbackTransformation}.
+ * It is instead directly wired to the {@code TwoInputTransformation} after this
+ * {@code CoFeedbackTransformation}.
+ *
+ * <p>
+ * This is different from Iterations in batch processing.
+ * @see org.apache.flink.streaming.api.transformations.FeedbackTransformation
+ *
+ * @param <F> The type of the feedback elements.
+ *
+ */
+public class CoFeedbackTransformation<F> extends StreamTransformation<F> {
+
+ private final List<StreamTransformation<F>> feedbackEdges;
+
+ private final Long waitTime;
+
+ /**
+ * Creates a new {@code CoFeedbackTransformation} from the given input.
+ *
+ * @param parallelism The parallelism of the upstream {@code StreamTransformatino} and the
+ * feedback edges.
+ * @param feedbackType The type of the feedback edges
+ * @param waitTime The wait time of the feedback operator. After the time expires
+ * the operation will close and not receive any more feedback elements.
+ */
+ public CoFeedbackTransformation(int parallelism,
+ TypeInformation<F> feedbackType,
+ Long waitTime) {
+ super("CoFeedback", feedbackType, parallelism);
+ this.waitTime = waitTime;
+ this.feedbackEdges = Lists.newArrayList();
+ }
+
+ /**
+ * Adds a feedback edge. The parallelism of the {@code StreamTransformation} must match
+ * the parallelism of the input {@code StreamTransformation} of the upstream
+ * {@code StreamTransformation}.
+ *
+ * @param transform The new feedback {@code StreamTransformation}.
+ */
+ public void addFeedbackEdge(StreamTransformation<F> transform) {
+
+ if (transform.getParallelism() != this.getParallelism()) {
+ throw new UnsupportedOperationException(
+ "Parallelism of the feedback stream must match the parallelism of the original" +
+ " stream. Parallelism of original stream: " + this.getParallelism() +
+ "; parallelism of feedback stream: " + transform.getParallelism());
+ }
+
+ feedbackEdges.add(transform);
+ }
+
+ /**
+ * Returns the list of feedback {@code StreamTransformations}.
+ */
+ public List<StreamTransformation<F>> getFeedbackEdges() {
+ return feedbackEdges;
+ }
+
+ /**
+ * Returns the wait time. This is the amount of time that the feedback operator keeps listening
+ * for feedback elements. Once the time expires the operation will close and will not receive
+ * further elements.
+ */
+ public Long getWaitTime() {
+ return waitTime;
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ return Collections.<StreamTransformation<?>>singleton(this);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
new file mode 100644
index 0000000..11a2f33
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This represents a feedback point in a topology.
+ *
+ * <p>
+ * This is different from how iterations work in batch processing. Once a feedback point is defined
+ * you can connect one or several {@code StreamTransformations} as a feedback edges. Operations
+ * downstream from the feedback point will receive elements from the input of this feedback point
+ * and from the feedback edges.
+ *
+ * <p>
+ * Both the partitioning of the input and the feedback edges is preserved. They can also have
+ * differing partitioning strategies. This requires, however, that the parallelism of the feedback
+ * {@code StreamTransformations} must match the parallelism of the input
+ * {@code StreamTransformation}.
+ *
+ * <p>
+ * The type of the input {@code StreamTransformation} and the feedback {@code StreamTransformation}
+ * must match.
+ *
+ * @param <T> The type of the input elements and the feedback elements.
+ */
+public class FeedbackTransformation<T> extends StreamTransformation<T> {
+
+ private final StreamTransformation<T> input;
+
+ private final List<StreamTransformation<T>> feedbackEdges;
+
+ private final Long waitTime;
+
+ /**
+ * Creates a new {@code FeedbackTransformation} from the given input.
+ *
+ * @param input The input {@code StreamTransformation}
+ * @param waitTime The wait time of the feedback operator. After the time expires
+ * the operation will close and not receive any more feedback elements.
+ */
+ public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) {
+ super("Feedback", input.getOutputType(), input.getParallelism());
+ this.input = input;
+ this.waitTime = waitTime;
+ this.feedbackEdges = Lists.newArrayList();
+ }
+
+ /**
+ * Returns the input {@code StreamTransformation} of this {@code FeedbackTransformation}.
+ */
+ public StreamTransformation<T> getInput() {
+ return input;
+ }
+
+ /**
+ * Adds a feedback edge. The parallelism of the {@code StreamTransformation} must match
+ * the parallelism of the input {@code StreamTransformation} of this
+ * {@code FeedbackTransformation}
+ *
+ * @param transform The new feedback {@code StreamTransformation}.
+ */
+ public void addFeedbackEdge(StreamTransformation<T> transform) {
+
+ if (transform.getParallelism() != this.getParallelism()) {
+ throw new UnsupportedOperationException(
+ "Parallelism of the feedback stream must match the parallelism of the original" +
+ " stream. Parallelism of original stream: " + this.getParallelism() +
+ "; parallelism of feedback stream: " + transform.getParallelism());
+ }
+
+ feedbackEdges.add(transform);
+ }
+
+ /**
+ * Returns the list of feedback {@code StreamTransformations}.
+ */
+ public List<StreamTransformation<T>> getFeedbackEdges() {
+ return feedbackEdges;
+ }
+
+ /**
+ * Returns the wait time. This is the amount of time that the feedback operator keeps listening
+ * for feedback elements. Once the time expires the operation will close and will not receive
+ * further elements.
+ */
+ public Long getWaitTime() {
+ return waitTime;
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ List<StreamTransformation<?>> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(input.getTransitivePredecessors());
+ return result;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
new file mode 100644
index 0000000..945d8eb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This Transformation represents the application of a
+ * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input
+ * {@link org.apache.flink.streaming.api.transformations.StreamTransformation}.
+ *
+ * @param <IN> The type of the elements in the nput {@code StreamTransformation}
+ * @param <OUT> The type of the elements that result from this {@code OneInputTransformation}
+ */
+public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
+
+ private final StreamTransformation<IN> input;
+
+ private KeySelector<IN, ?> stateKeySelector;
+
+ private final OneInputStreamOperator<IN, OUT> operator;
+
+ /**
+ * Creates a new {@code OneInputTransformation} from the given input and operator.
+ *
+ * @param input The input {@code StreamTransformation}
+ * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
+ * @param operator The {@code TwoInputStreamOperator}
+ * @param outputType The type of the elements produced by this {@code OneInputTransformation}
+ * @param parallelism The parallelism of this {@code OneInputTransformation}
+ */
+ public OneInputTransformation(
+ StreamTransformation<IN> input,
+ String name,
+ OneInputStreamOperator<IN, OUT> operator,
+ TypeInformation<OUT> outputType,
+ int parallelism) {
+ super(name, outputType, parallelism);
+ this.input = input;
+ this.operator = operator;
+ }
+
+ /**
+ * Returns the input {@code StreamTransformation} of this {@code OneInputTransformation}.
+ */
+ public StreamTransformation<IN> getInput() {
+ return input;
+ }
+
+ /**
+ * Returns the {@code TypeInformation} for the elements of the input.
+ */
+ public TypeInformation<IN> getInputType() {
+ return input.getOutputType();
+ }
+
+ /**
+ * Returns the {@code TwoInputStreamOperator} of this Transformation.
+ */
+ public OneInputStreamOperator<IN, OUT> getOperator() {
+ return operator;
+ }
+
+ /**
+ * Sets the {@link KeySelector} that must be used for partitioning keyed state of this operation.
+ *
+ * @param stateKeySelector The {@code KeySelector} to set
+ */
+ public void setStateKeySelector(KeySelector<IN, ?> stateKeySelector) {
+ this.stateKeySelector = stateKeySelector;
+ }
+
+ /**
+ * Returns the {@code KeySelector} that must be used for partitioning keyed state in this
+ * Operation.
+ *
+ * @see #setStateKeySelector
+ */
+ public KeySelector<IN, ?> getStateKeySelector() {
+ return stateKeySelector;
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ List<StreamTransformation<?>> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(input.getTransitivePredecessors());
+ return result;
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ operator.setChainingStrategy(strategy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
new file mode 100644
index 0000000..1165d5d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This transformation represents a change of partitioning of the input elements.
+ *
+ * <p>
+ * This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code PartitionTransformation}
+ */
+public class PartitionTransformation<T> extends StreamTransformation<T> {
+ private final StreamTransformation<T> input;
+ private final StreamPartitioner<T> partitioner;
+
+ /**
+ * Creates a new {@code PartitionTransformation} from the given input and
+ * {@link StreamPartitioner}.
+ *
+ * @param input The input {@code StreamTransformation}
+ * @param partitioner The {@code StreamPartitioner}
+ */
+ public PartitionTransformation(StreamTransformation<T> input, StreamPartitioner<T> partitioner) {
+ super("Partition", input.getOutputType(), input.getParallelism());
+ this.input = input;
+ this.partitioner = partitioner;
+ }
+
+ /**
+ * Returns the input {@code StreamTransformation} of this {@code SinkTransformation}.
+ */
+ public StreamTransformation<T> getInput() {
+ return input;
+ }
+
+ /**
+ * Returns the {@code StreamPartitioner} that must be used for partitioning the elements
+ * of the input {@code StreamTransformation}.
+ */
+ public StreamPartitioner<T> getPartitioner() {
+ return partitioner;
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ List<StreamTransformation<?>> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(input.getTransitivePredecessors());
+ return result;
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
new file mode 100644
index 0000000..92033bd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This transformation represents a selection of only certain upstream elements. This must
+ * follow a {@link org.apache.flink.streaming.api.transformations.SplitTransformation} that
+ * splits elements into several logical streams with assigned names.
+ *
+ * <p>
+ * This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code SelectTransformation}
+ */
+public class SelectTransformation<T> extends StreamTransformation<T> {
+ private final StreamTransformation<T> input;
+ private List<String> selectedNames;
+
+ /**
+ * Creates a new {@coe SelectionTransformation} from the given input that only selects
+ * the streams with the selected names.
+ *
+ * @param input The input {@code StreamTransformation}
+ * @param selectedNames The names from the upstream {@code SplitTransformation} that this
+ * {@code SelectTransformation} selects.
+ */
+ public SelectTransformation(StreamTransformation<T> input,
+ List<String> selectedNames) {
+ super("Select", input.getOutputType(), input.getParallelism());
+ this.input = input;
+ this.selectedNames = selectedNames;
+ }
+
+ /**
+ * Returns the input {@code StreamTransformation}.
+ */
+ public StreamTransformation<T> getInput() {
+ return input;
+ }
+
+ /**
+ * Returns the names of the split streams that this {@code SelectTransformation} selects.
+ */
+ public List<String> getSelectedNames() {
+ return selectedNames;
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ List<StreamTransformation<?>> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(input.getTransitivePredecessors());
+ return result;
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ throw new UnsupportedOperationException("Cannot set chaining strategy on Select Transformation.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
new file mode 100644
index 0000000..2a4e2d0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This Transformation represents a Sink.
+ *
+ * @param <T> The type of the elements in the input {@code SinkTransformation}
+ */
+public class SinkTransformation<T> extends StreamTransformation<Object> {
+
+ private final StreamTransformation<T> input;
+
+ private final StreamSink<T> operator;
+
+ // We need this because sinks can also have state that is partitioned by key
+ private KeySelector<T, ?> stateKeySelector;
+
+ /**
+ * Creates a new {@code SinkTransformation} from the given input {@code StreamTransformation}.
+ *
+ * @param input The input {@code StreamTransformation}
+ * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
+ * @param operator The sink operator
+ * @param parallelism The parallelism of this {@code SinkTransformation}
+ */
+ public SinkTransformation(
+ StreamTransformation<T> input,
+ String name,
+ StreamSink<T> operator,
+ int parallelism) {
+ super(name, TypeExtractor.getForClass(Object.class), parallelism);
+ this.input = input;
+ this.operator = operator;
+ }
+
+ /**
+ * Returns the input {@code StreamTransformation} of this {@code SinkTransformation}.
+ */
+ public StreamTransformation<T> getInput() {
+ return input;
+ }
+
+ /**
+ * Returns the {@link StreamSink} that is the operator of this {@code SinkTransformation}.
+ */
+ public StreamSink<T> getOperator() {
+ return operator;
+ }
+
+ /**
+ * Sets the {@link KeySelector} that must be used for partitioning keyed state of this Sink.
+ *
+ * @param stateKeySelector The {@code KeySelector} to set
+ */
+ public void setStateKeySelector(KeySelector<T, ?> stateKeySelector) {
+ this.stateKeySelector = stateKeySelector;
+ }
+
+ /**
+ * Returns the {@code KeySelector} that must be used for partitioning keyed state in this
+ * Sink.
+ *
+ * @see #setStateKeySelector
+ */
+ public KeySelector<T, ?> getStateKeySelector() {
+ return stateKeySelector;
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ List<StreamTransformation<?>> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(input.getTransitivePredecessors());
+ return result;
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ operator.setChainingStrategy(strategy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
new file mode 100644
index 0000000..c14c58c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSource;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * This represents a Source. This does not actually transform anything since it has no inputs but
+ * it is the root {@code StreamTransformation} of any topology.
+ *
+ * @param <T> The type of the elements that this source produces
+ */
+public class SourceTransformation<T> extends StreamTransformation<T> {
+
+ private final StreamSource<T> operator;
+
+ /**
+ * Creates a new {@code SourceTransformation} from the given operator.
+ *
+ * @param name The name of the {@code SourceTransformation}, this will be shown in Visualizations and the Log
+ * @param operator The {@code StreamSource} that is the operator of this Transformation
+ * @param outputType The type of the elements produced by this {@code SourceTransformation}
+ * @param parallelism The parallelism of this {@code SourceTransformation}
+ */
+ public SourceTransformation(
+ String name,
+ StreamSource<T> operator,
+ TypeInformation<T> outputType,
+ int parallelism) {
+ super(name, outputType, parallelism);
+ this.operator = operator;
+ }
+
+ /**
+ * Returns the {@code StreamSource}, the operator of this {@code SourceTransformation}.
+ */
+ public StreamSource<T> getOperator() {
+ return operator;
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ return Collections.<StreamTransformation<?>>singleton(this);
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ operator.setChainingStrategy(strategy);
+ }
+}