You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:53 UTC

[37/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
deleted file mode 100644
index 743ee4a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.sling.commons.json.JSONArray;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-
-public class JSONGenerator {
-
-	public static final String STEPS = "step_function";
-	public static final String ID = "id";
-	public static final String SIDE = "side";
-	public static final String SHIP_STRATEGY = "ship_strategy";
-	public static final String PREDECESSORS = "predecessors";
-	public static final String TYPE = "type";
-	public static final String PACT = "pact";
-	public static final String CONTENTS = "contents";
-	public static final String PARALLELISM = "parallelism";
-
-	private StreamGraph streamGraph;
-
-	public JSONGenerator(StreamGraph streamGraph) {
-		this.streamGraph = streamGraph;
-	}
-
-	public String getJSON() throws JSONException {
-		JSONObject json = new JSONObject();
-		JSONArray nodes = new JSONArray();
-		json.put("nodes", nodes);
-		List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
-		Collections.sort(operatorIDs, new Comparator<Integer>() {
-			@Override
-			public int compare(Integer o1, Integer o2) {
-				// put sinks at the back
-				if (streamGraph.getSinkIDs().contains(o1)) {
-					return 1;
-				} else if (streamGraph.getSinkIDs().contains(o2)) {
-					return -1;
-				} else {
-					return o1 - o2;
-				}
-			}
-		});
-		visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
-		return json.toString();
-	}
-
-	private void visit(JSONArray jsonArray, List<Integer> toVisit,
-			Map<Integer, Integer> edgeRemapings) throws JSONException {
-
-		Integer vertexID = toVisit.get(0);
-		StreamNode vertex = streamGraph.getStreamNode(vertexID);
-
-		if (streamGraph.getSourceIDs().contains(vertexID)
-				|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
-
-			JSONObject node = new JSONObject();
-			decorateNode(vertexID, node);
-
-			if (!streamGraph.getSourceIDs().contains(vertexID)) {
-				JSONArray inputs = new JSONArray();
-				node.put(PREDECESSORS, inputs);
-
-				for (StreamEdge inEdge : vertex.getInEdges()) {
-					int inputID = inEdge.getSourceId();
-
-					Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ? edgeRemapings
-							.get(inputID) : inputID;
-					decorateEdge(inputs, vertexID, mappedID, inputID);
-				}
-			}
-			jsonArray.put(node);
-			toVisit.remove(vertexID);
-		} else {
-			Integer iterationHead = -1;
-			for (StreamEdge inEdge : vertex.getInEdges()) {
-				int operator = inEdge.getSourceId();
-
-				if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
-					iterationHead = operator;
-				}
-			}
-
-			JSONObject obj = new JSONObject();
-			JSONArray iterationSteps = new JSONArray();
-			obj.put(STEPS, iterationSteps);
-			obj.put(ID, iterationHead);
-			obj.put(PACT, "IterativeDataStream");
-			obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
-			obj.put(CONTENTS, "Stream Iteration");
-			JSONArray iterationInputs = new JSONArray();
-			obj.put(PREDECESSORS, iterationInputs);
-			toVisit.remove(iterationHead);
-			visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
-			jsonArray.put(obj);
-		}
-
-		if (!toVisit.isEmpty()) {
-			visit(jsonArray, toVisit, edgeRemapings);
-		}
-	}
-
-	private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int headId,
-			Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException {
-
-		Integer vertexID = toVisit.get(0);
-		StreamNode vertex = streamGraph.getStreamNode(vertexID);
-		toVisit.remove(vertexID);
-
-		// Ignoring head and tail to avoid redundancy
-		if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
-			JSONObject obj = new JSONObject();
-			jsonArray.put(obj);
-			decorateNode(vertexID, obj);
-			JSONArray inEdges = new JSONArray();
-			obj.put(PREDECESSORS, inEdges);
-
-			for (StreamEdge inEdge : vertex.getInEdges()) {
-				int inputID = inEdge.getSourceId();
-
-				if (edgeRemapings.keySet().contains(inputID)) {
-					decorateEdge(inEdges, vertexID, inputID, inputID);
-				} else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
-					decorateEdge(iterationInEdges, vertexID, inputID, inputID);
-				}
-			}
-
-			edgeRemapings.put(vertexID, headId);
-			visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
-		}
-
-	}
-
-	private void decorateEdge(JSONArray inputArray, int vertexID, int mappedInputID, int inputID)
-			throws JSONException {
-		JSONObject input = new JSONObject();
-		inputArray.put(input);
-		input.put(ID, mappedInputID);
-		input.put(SHIP_STRATEGY, streamGraph.getStreamEdge(inputID, vertexID).getPartitioner());
-		input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
-	}
-
-	private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {
-
-		StreamNode vertex = streamGraph.getStreamNode(vertexID);
-
-		node.put(ID, vertexID);
-		node.put(TYPE, vertex.getOperatorName());
-
-		if (streamGraph.getSourceIDs().contains(vertexID)) {
-			node.put(PACT, "Data Source");
-		} else if (streamGraph.getSinkIDs().contains(vertexID)) {
-			node.put(PACT, "Data Sink");
-		} else {
-			node.put(PACT, "Operator");
-		}
-
-		StreamOperator<?> operator = streamGraph.getStreamNode(vertexID).getOperator();
-
-		node.put(CONTENTS, vertex.getOperatorName());
-
-		node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
deleted file mode 100644
index 11bf84f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.ClassLoaderUtil;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
-import org.apache.flink.util.InstantiationUtil;
-
-public class StreamConfig implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	// ------------------------------------------------------------------------
-	//  Config Keys
-	// ------------------------------------------------------------------------
-	
-	private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
-	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
-	private static final String CHAINED_OUTPUTS = "chainedOutputs";
-	private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
-	private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
-	private static final String VERTEX_NAME = "vertexID";
-	private static final String ITERATION_ID = "iterationId";
-	private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
-	private static final String SERIALIZEDUDF = "serializedUDF";
-	private static final String USER_FUNCTION = "userFunction";
-	private static final String BUFFER_TIMEOUT = "bufferTimeout";
-	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
-	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
-	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
-	private static final String ITERATON_WAIT = "iterationWait";
-	private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
-	private static final String EDGES_IN_ORDER = "edgesInOrder";
-	private static final String OUT_STREAM_EDGES = "outStreamEdges";
-	private static final String IN_STREAM_EDGES = "inStreamEdges";
-
-	private static final String CHECKPOINTING_ENABLED = "checkpointing";
-	private static final String CHECKPOINT_MODE = "checkpointMode";
-	
-	private static final String STATE_BACKEND = "statebackend";
-	private static final String STATE_PARTITIONER = "statePartitioner";
-	private static final String STATE_KEY_SERIALIZER = "statekeyser";
-	
-	
-	// ------------------------------------------------------------------------
-	//  Default Values
-	// ------------------------------------------------------------------------
-	
-	private static final long DEFAULT_TIMEOUT = 100;
-	private static final CheckpointingMode DEFAULT_CHECKPOINTING_MODE = CheckpointingMode.EXACTLY_ONCE;
-	
-	
-	// ------------------------------------------------------------------------
-	//  Config
-	// ------------------------------------------------------------------------
-
-	private final Configuration config;
-
-	public StreamConfig(Configuration config) {
-		this.config = config;
-	}
-
-	public Configuration getConfiguration() {
-		return config;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Configured Properties
-	// ------------------------------------------------------------------------
-
-	public void setVertexID(Integer vertexID) {
-		config.setInteger(VERTEX_NAME, vertexID);
-	}
-
-	public Integer getVertexID() {
-		return config.getInteger(VERTEX_NAME, -1);
-	}
-	
-	public void setTypeSerializerIn1(TypeSerializer<?> serializer) {
-		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
-	}
-
-	public void setTypeSerializerIn2(TypeSerializer<?> serializer) {
-		setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
-	}
-
-	public void setTypeSerializerOut(TypeSerializer<?> serializer) {
-		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
-	}
-	
-	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_1, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
-		}
-	}
-	
-	public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_2, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
-		}
-	}
-	
-	public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
-		}
-	}
-
-	private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
-		try {
-			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
-		} catch (IOException e) {
-			throw new StreamTaskException("Could not serialize type serializer.", e);
-		}
-	}
-
-	public void setBufferTimeout(long timeout) {
-		config.setLong(BUFFER_TIMEOUT, timeout);
-	}
-
-	public long getBufferTimeout() {
-		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
-	}
-
-	public void setStreamOperator(StreamOperator<?> operator) {
-		if (operator != null) {
-			config.setClass(USER_FUNCTION, operator.getClass());
-
-			try {
-				InstantiationUtil.writeObjectToConfig(operator, this.config, SERIALIZEDUDF);
-			} catch (IOException e) {
-				throw new StreamTaskException("Cannot serialize operator object "
-						+ operator.getClass() + ".", e);
-			}
-		}
-	}
-	
-	public <T> T getStreamOperator(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
-		}
-		catch (ClassNotFoundException e) {
-			String classLoaderInfo = ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
-			boolean loadableDoubleCheck = ClassLoaderUtil.validateClassLoadable(e, cl);
-			
-			String exceptionMessage = "Cannot load user class: " + e.getMessage()
-					+ "\nClassLoader info: " + classLoaderInfo + 
-					(loadableDoubleCheck ? 
-							"\nClass was actually found in classloader - deserialization issue." :
-							"\nClass not resolvable through given classloader.");
-			
-			throw new StreamTaskException(exceptionMessage);
-		}
-		catch (Exception e) {
-			throw new StreamTaskException("Cannot instantiate user function.", e);
-		}
-	}
-
-	public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) {
-		try {
-			InstantiationUtil.writeObjectToConfig(outputSelectorWrapper, this.config, OUTPUT_SELECTOR_WRAPPER);
-		} catch (IOException e) {
-			throw new StreamTaskException("Cannot serialize OutputSelectorWrapper.", e);
-		}
-	}
-	
-	public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper.", e);
-		}
-	}
-
-	public void setIterationId(String iterationId) {
-		config.setString(ITERATION_ID, iterationId);
-	}
-
-	public String getIterationId() {
-		return config.getString(ITERATION_ID, "");
-	}
-
-	public void setIterationWaitTime(long time) {
-		config.setLong(ITERATON_WAIT, time);
-	}
-
-	public long getIterationWaitTime() {
-		return config.getLong(ITERATON_WAIT, 0);
-	}
-
-	public void setNumberOfInputs(int numberOfInputs) {
-		config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
-	}
-
-	public int getNumberOfInputs() {
-		return config.getInteger(NUMBER_OF_INPUTS, 0);
-	}
-
-	public void setNumberOfOutputs(int numberOfOutputs) {
-		config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
-	}
-
-	public int getNumberOfOutputs() {
-		return config.getInteger(NUMBER_OF_OUTPUTS, 0);
-	}
-
-	public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
-		try {
-			InstantiationUtil.writeObjectToConfig(outputvertexIDs, this.config, NONCHAINED_OUTPUTS);
-		} catch (IOException e) {
-			throw new StreamTaskException("Cannot serialize non chained outputs.", e);
-		}
-	}
-	
-	public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
-		try {
-			List<StreamEdge> nonChainedOutputs = InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
-			return nonChainedOutputs == null ?  new ArrayList<StreamEdge>() : nonChainedOutputs;
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate non chained outputs.", e);
-		}
-	}
-
-	public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
-		try {
-			InstantiationUtil.writeObjectToConfig(chainedOutputs, this.config, CHAINED_OUTPUTS);
-		} catch (IOException e) {
-			throw new StreamTaskException("Cannot serialize chained outputs.", e);
-		}
-	}
-	
-	public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
-		try {
-			List<StreamEdge> chainedOutputs = InstantiationUtil.readObjectFromConfig(this.config, CHAINED_OUTPUTS, cl);
-			return chainedOutputs == null ? new ArrayList<StreamEdge>() : chainedOutputs;
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate chained outputs.", e);
-		}
-	}
-
-	public void setOutEdges(List<StreamEdge> outEdges) {
-		try {
-			InstantiationUtil.writeObjectToConfig(outEdges, this.config, OUT_STREAM_EDGES);
-		} catch (IOException e) {
-			throw new StreamTaskException("Cannot serialize outward edges.", e);
-		}
-	}
-	
-	public List<StreamEdge> getOutEdges(ClassLoader cl) {
-		try {
-			List<StreamEdge> outEdges = InstantiationUtil.readObjectFromConfig(this.config, OUT_STREAM_EDGES, cl);
-			return outEdges == null ? new ArrayList<StreamEdge>() : outEdges;
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate outputs.", e);
-		}
-	}
-
-	public void setInPhysicalEdges(List<StreamEdge> inEdges) {
-		try {
-			InstantiationUtil.writeObjectToConfig(inEdges, this.config, IN_STREAM_EDGES);
-		} catch (IOException e) {
-			throw new StreamTaskException("Cannot serialize inward edges.", e);
-		}
-	}
-	
-	public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
-		try {
-			List<StreamEdge> inEdges = InstantiationUtil.readObjectFromConfig(this.config, IN_STREAM_EDGES, cl);
-			return inEdges == null ? new ArrayList<StreamEdge>() : inEdges;
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate inputs.", e);
-		}
-	}
-
-	// --------------------- checkpointing -----------------------
-	
-	public void setCheckpointingEnabled(boolean enabled) {
-		config.setBoolean(CHECKPOINTING_ENABLED, enabled);
-	}
-
-	public boolean isCheckpointingEnabled() {
-		return config.getBoolean(CHECKPOINTING_ENABLED, false);
-	}
-	
-	public void setCheckpointMode(CheckpointingMode mode) {
-		config.setInteger(CHECKPOINT_MODE, mode.ordinal());
-	}
-
-	public CheckpointingMode getCheckpointMode() {
-		int ordinal = config.getInteger(CHECKPOINT_MODE, -1);
-		if (ordinal >= 0) {
-			return CheckpointingMode.values()[ordinal];
-		} else {
-			return DEFAULT_CHECKPOINTING_MODE; 
-		}
-	}
-	
-
-	public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
-		try {
-			InstantiationUtil.writeObjectToConfig(outEdgeList, this.config, EDGES_IN_ORDER);
-		} catch (IOException e) {
-			throw new StreamTaskException("Could not serialize outputs in order.", e);
-		}
-	}
-	
-	public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
-		try {
-			List<StreamEdge> outEdgesInOrder = InstantiationUtil.readObjectFromConfig(this.config, EDGES_IN_ORDER, cl);
-			return outEdgesInOrder == null ? new ArrayList<StreamEdge>() : outEdgesInOrder;
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate outputs in order.", e);
-		}
-	}
-
-	public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
-
-		try {
-			InstantiationUtil.writeObjectToConfig(chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG);
-		} catch (IOException e) {
-			throw new StreamTaskException("Could not serialize configuration.", e);
-		}
-	}
-	
-	public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
-		try {
-			Map<Integer, StreamConfig> confs = InstantiationUtil.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
-			return confs == null ? new HashMap<Integer, StreamConfig>() : confs;
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate configuration.", e);
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  State backend
-	// ------------------------------------------------------------------------
-	
-	public void setStateBackend(StateBackend<?> backend) {
-		try {
-			InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not serialize stateHandle provider.", e);
-		}
-	}
-	
-	public StateBackend<?> getStateBackend(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, STATE_BACKEND, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate statehandle provider.", e);
-		}
-	}
-	
-	public void setStatePartitioner(KeySelector<?, ?> partitioner) {
-		try {
-			InstantiationUtil.writeObjectToConfig(partitioner, this.config, STATE_PARTITIONER);
-		} catch (IOException e) {
-			throw new StreamTaskException("Could not serialize state partitioner.", e);
-		}
-	}
-	
-	public KeySelector<?, Serializable> getStatePartitioner(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, STATE_PARTITIONER, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate state partitioner.", e);
-		}
-	}
-	
-	public void setStateKeySerializer(TypeSerializer<?> serializer) {
-		try {
-			InstantiationUtil.writeObjectToConfig(serializer, this.config, STATE_KEY_SERIALIZER);
-		} catch (IOException e) {
-			throw new StreamTaskException("Could not serialize state key serializer.", e);
-		}
-	}
-
-	public <K> TypeSerializer<K> getStateKeySerializer(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, STATE_KEY_SERIALIZER, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate state key serializer from task config.", e);
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Miscellansous
-	// ------------------------------------------------------------------------
-	
-	public void setChainStart() {
-		config.setBoolean(IS_CHAINED_VERTEX, true);
-	}
-
-	public boolean isChainStart() {
-		return config.getBoolean(IS_CHAINED_VERTEX, false);
-	}
-
-	@Override
-	public String toString() {
-
-		ClassLoader cl = getClass().getClassLoader();
-
-		StringBuilder builder = new StringBuilder();
-		builder.append("\n=======================");
-		builder.append("Stream Config");
-		builder.append("=======================");
-		builder.append("\nNumber of non-chained inputs: ").append(getNumberOfInputs());
-		builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
-		builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));
-		builder.append("\nPartitioning:");
-		for (StreamEdge output : getNonChainedOutputs(cl)) {
-			int outputname = output.getTargetId();
-			builder.append("\n\t").append(outputname).append(": ").append(output.getPartitioner());
-		}
-
-		builder.append("\nChained subtasks: ").append(getChainedOutputs(cl));
-
-		try {
-			builder.append("\nOperator: ").append(getStreamOperator(cl).getClass().getSimpleName());
-		}
-		catch (Exception e) {
-			builder.append("\nOperator: Missing");
-		}
-		builder.append("\nBuffer timeout: ").append(getBufferTimeout());
-		builder.append("\nState Monitoring: ").append(isCheckpointingEnabled());
-		if (isChainStart() && getChainedOutputs(cl).size() > 0) {
-			builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
-			builder.append(getTransitiveChainedTaskConfigs(cl));
-		}
-
-		return builder.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
deleted file mode 100644
index c252870..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * An edge in the streaming topology. One edge like this does not necessarily
- * gets converted to a connection between two job vertices (due to
- * chaining/optimization).
- */
-public class StreamEdge implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	final private String edgeId;
-
-	final private StreamNode sourceVertex;
-	final private StreamNode targetVertex;
-
-	/**
-	 * The type number of the input for co-tasks.
-	 */
-	final private int typeNumber;
-
-	/**
-	 * A list of output names that the target vertex listens to (if there is
-	 * output selection).
-	 */
-	private final List<String> selectedNames;
-	private StreamPartitioner<?> outputPartitioner;
-
-	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
-			List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
-		this.sourceVertex = sourceVertex;
-		this.targetVertex = targetVertex;
-		this.typeNumber = typeNumber;
-		this.selectedNames = selectedNames;
-		this.outputPartitioner = outputPartitioner;
-
-		this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
-				+ "_" + outputPartitioner;
-	}
-
-	public StreamNode getSourceVertex() {
-		return sourceVertex;
-	}
-
-	public StreamNode getTargetVertex() {
-		return targetVertex;
-	}
-
-	public int getSourceId() {
-		return sourceVertex.getId();
-	}
-
-	public int getTargetId() {
-		return targetVertex.getId();
-	}
-
-	public int getTypeNumber() {
-		return typeNumber;
-	}
-
-	public List<String> getSelectedNames() {
-		return selectedNames;
-	}
-
-	public StreamPartitioner<?> getPartitioner() {
-		return outputPartitioner;
-	}
-	
-	public void setPartitioner(StreamPartitioner<?> partitioner) {
-		this.outputPartitioner = partitioner;
-	}
-
-	@Override
-	public int hashCode() {
-		return edgeId.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		StreamEdge that = (StreamEdge) o;
-
-		return edgeId.equals(that.edgeId);
-	}
-
-	@Override
-	public String toString() {
-		return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
-				+ ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
-				+ ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
deleted file mode 100644
index be020d7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
-import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
-import org.apache.sling.commons.json.JSONException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class representing the streaming topology. It contains all the information
- * necessary to build the jobgraph for the execution.
- * 
- */
-public class StreamGraph extends StreamingPlan {
-
-	/** The default interval for checkpoints, in milliseconds */
-	public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
-
-	private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
-
-	private final StreamExecutionEnvironment environemnt;
-	private final ExecutionConfig executionConfig;
-
-	private CheckpointingMode checkpointingMode;
-	private boolean checkpointingEnabled = false;
-	private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS;
-	private boolean chaining = true;
-
-	private Map<Integer, StreamNode> streamNodes;
-	private Set<Integer> sources;
-	private Set<Integer> sinks;
-	private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
-	private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtuaPartitionNodes;
-
-	protected Map<Integer, String> vertexIDtoBrokerID;
-	protected Map<Integer, Long> vertexIDtoLoopTimeout;
-	private StateBackend<?> stateBackend;
-	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
-
-	private boolean forceCheckpoint = false;
-
-	public StreamGraph(StreamExecutionEnvironment environment) {
-
-		this.environemnt = environment;
-		executionConfig = environment.getConfig();
-
-		// create an empty new stream graph.
-		clear();
-	}
-
-	/**
-	 * Remove all registered nodes etc.
-	 */
-	public void clear() {
-		streamNodes = Maps.newHashMap();
-		virtualSelectNodes = Maps.newHashMap();
-		virtuaPartitionNodes = Maps.newHashMap();
-		vertexIDtoBrokerID = Maps.newHashMap();
-		vertexIDtoLoopTimeout = Maps.newHashMap();
-		iterationSourceSinkPairs = Sets.newHashSet();
-		sources = Sets.newHashSet();
-		sinks = Sets.newHashSet();
-	}
-
-	protected ExecutionConfig getExecutionConfig() {
-		return executionConfig;
-	}
-
-	public void setJobName(String jobName) {
-		this.jobName = jobName;
-	}
-
-	public void setChaining(boolean chaining) {
-		this.chaining = chaining;
-	}
-
-	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
-		this.checkpointingEnabled = checkpointingEnabled;
-	}
-
-	public void setCheckpointingInterval(long checkpointingInterval) {
-		this.checkpointingInterval = checkpointingInterval;
-	}
-
-	public void forceCheckpoint() {
-		this.forceCheckpoint = true;
-	}
-
-	public void setStateBackend(StateBackend<?> backend) {
-		this.stateBackend = backend;
-	}
-
-	public StateBackend<?> getStateBackend() {
-		return this.stateBackend;
-	}
-
-	public long getCheckpointingInterval() {
-		return checkpointingInterval;
-	}
-
-	// Checkpointing
-	
-	public boolean isChainingEnabled() {
-		return chaining;
-	}
-
-	public boolean isCheckpointingEnabled() {
-		return checkpointingEnabled;
-	}
-
-	public CheckpointingMode getCheckpointingMode() {
-		return checkpointingMode;
-	}
-
-	public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
-		this.checkpointingMode = checkpointingMode;
-	}
-	
-
-	public boolean isIterative() {
-		return!vertexIDtoLoopTimeout.isEmpty();
-	}
-
-	public <IN, OUT> void addSource(Integer vertexID, StreamOperator<OUT> operatorObject,
-			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
-		addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
-		sources.add(vertexID);
-	}
-
-	public <IN, OUT> void addSink(Integer vertexID, StreamOperator<OUT> operatorObject,
-			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
-		addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
-		sinks.add(vertexID);
-	}
-
-	public <IN, OUT> void addOperator(
-			Integer vertexID,
-			StreamOperator<OUT> operatorObject,
-			TypeInformation<IN> inTypeInfo,
-			TypeInformation<OUT> outTypeInfo,
-			String operatorName) {
-
-		if (operatorObject instanceof StreamSource) {
-			addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName);
-		} else {
-			addNode(vertexID, OneInputStreamTask.class, operatorObject, operatorName);
-		}
-
-		TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
-
-		TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;
-
-		setSerializers(vertexID, inSerializer, null, outSerializer);
-
-		if (operatorObject instanceof OutputTypeConfigurable) {
-			@SuppressWarnings("unchecked")
-			OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) operatorObject;
-			// sets the output type which must be know at StreamGraph creation time
-			outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
-		}
-
-		if (operatorObject instanceof InputTypeConfigurable) {
-			InputTypeConfigurable inputTypeConfigurable = (InputTypeConfigurable) operatorObject;
-			inputTypeConfigurable.setInputType(inTypeInfo, executionConfig);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Vertex: {}", vertexID);
-		}
-	}
-
-	public <IN1, IN2, OUT> void addCoOperator(
-			Integer vertexID,
-			TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject,
-			TypeInformation<IN1> in1TypeInfo,
-			TypeInformation<IN2> in2TypeInfo,
-			TypeInformation<OUT> outTypeInfo,
-			String operatorName) {
-
-		addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, operatorName);
-
-		TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ?
-				outTypeInfo.createSerializer(executionConfig) : null;
-
-		setSerializers(vertexID, in1TypeInfo.createSerializer(executionConfig), in2TypeInfo.createSerializer(executionConfig), outSerializer);
-
-		if (taskOperatorObject instanceof OutputTypeConfigurable) {
-			@SuppressWarnings("unchecked")
-			OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) taskOperatorObject;
-			// sets the output type which must be know at StreamGraph creation time
-			outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("CO-TASK: {}", vertexID);
-		}
-	}
-
-	protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
-			StreamOperator<?> operatorObject, String operatorName) {
-
-		if (streamNodes.containsKey(vertexID)) {
-			throw new RuntimeException("Duplicate vertexID " + vertexID);
-		}
-
-		StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
-				new ArrayList<OutputSelector<?>>(), vertexClass);
-
-		streamNodes.put(vertexID, vertex);
-
-		return vertex;
-	}
-
-	/**
-	 * Adds a new virtual node that is used to connect a downstream vertex to only the outputs
-	 * with the selected names.
-	 *
-	 * When adding an edge from the virtual node to a downstream node the connection will be made
-	 * to the original node, only with the selected names given here.
-	 *
-	 * @param originalId ID of the node that should be connected to.
-	 * @param virtualId ID of the virtual node.
-	 * @param selectedNames The selected names.
-	 */
-	public void addVirtualSelectNode(Integer originalId, Integer virtualId, List<String> selectedNames) {
-
-		if (virtualSelectNodes.containsKey(virtualId)) {
-			throw new IllegalStateException("Already has virtual select node with id " + virtualId);
-		}
-
-		virtualSelectNodes.put(virtualId,
-				new Tuple2<Integer, List<String>>(originalId, selectedNames));
-	}
-
-	/**
-	 * Adds a new virtual node that is used to connect a downstream vertex to an input with a certain
-	 * partitioning.
-	 *
-	 * When adding an edge from the virtual node to a downstream node the connection will be made
-	 * to the original node, but with the partitioning given here.
-	 *
-	 * @param originalId ID of the node that should be connected to.
-	 * @param virtualId ID of the virtual node.
-	 * @param partitioner The partitioner
-	 */
-	public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {
-
-		if (virtuaPartitionNodes.containsKey(virtualId)) {
-			throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
-		}
-
-		virtuaPartitionNodes.put(virtualId,
-				new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
-	}
-
-	public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
-		addEdgeInternal(upStreamVertexID,
-				downStreamVertexID,
-				typeNumber,
-				null,
-				Lists.<String>newArrayList());
-
-	}
-
-	private void addEdgeInternal(Integer upStreamVertexID,
-			Integer downStreamVertexID,
-			int typeNumber,
-			StreamPartitioner<?> partitioner,
-			List<String> outputNames) {
-
-
-		if (virtualSelectNodes.containsKey(upStreamVertexID)) {
-			int virtualId = upStreamVertexID;
-			upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
-			if (outputNames.isEmpty()) {
-				// selections that happen downstream override earlier selections
-				outputNames = virtualSelectNodes.get(virtualId).f1;
-			}
-			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
-		} else if (virtuaPartitionNodes.containsKey(upStreamVertexID)) {
-			int virtualId = upStreamVertexID;
-			upStreamVertexID = virtuaPartitionNodes.get(virtualId).f0;
-			if (partitioner == null) {
-				partitioner = virtuaPartitionNodes.get(virtualId).f1;
-			}
-			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
-		} else {
-			StreamNode upstreamNode = getStreamNode(upStreamVertexID);
-			StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
-			// If no partitioner was specified and the parallelism of upstream and downstream
-			// operator matches use forward partitioning, use rebalance otherwise.
-			if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
-				partitioner = new ForwardPartitioner<Object>();
-			} else if (partitioner == null) {
-				partitioner = new RebalancePartitioner<Object>();
-			}
-
-			if (partitioner instanceof ForwardPartitioner) {
-				if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
-					throw new UnsupportedOperationException("Forward partitioning does not allow " +
-							"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
-							", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
-							" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
-				}
-			}
-
-			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
-
-			getStreamNode(edge.getSourceId()).addOutEdge(edge);
-			getStreamNode(edge.getTargetId()).addInEdge(edge);
-		}
-	}
-
-	public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
-		if (virtuaPartitionNodes.containsKey(vertexID)) {
-			addOutputSelector(virtuaPartitionNodes.get(vertexID).f0, outputSelector);
-		} else if (virtualSelectNodes.containsKey(vertexID)) {
-			addOutputSelector(virtualSelectNodes.get(vertexID).f0, outputSelector);
-		} else {
-			getStreamNode(vertexID).addOutputSelector(outputSelector);
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Outputselector set for {}", vertexID);
-			}
-		}
-
-	}
-
-	public void setParallelism(Integer vertexID, int parallelism) {
-		if (getStreamNode(vertexID) != null) {
-			getStreamNode(vertexID).setParallelism(parallelism);
-		}
-	}
-
-	public void setKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
-		StreamNode node = getStreamNode(vertexID);
-		node.setStatePartitioner(keySelector);
-		node.setStateKeySerializer(keySerializer);
-	}
-
-	public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
-		if (getStreamNode(vertexID) != null) {
-			getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
-		}
-	}
-
-	public void setSerializers(Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) {
-		StreamNode vertex = getStreamNode(vertexID);
-		vertex.setSerializerIn1(in1);
-		vertex.setSerializerIn2(in2);
-		vertex.setSerializerOut(out);
-	}
-
-	public void setSerializersFrom(Integer from, Integer to) {
-		StreamNode fromVertex = getStreamNode(from);
-		StreamNode toVertex = getStreamNode(to);
-
-		toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
-		toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
-	}
-
-	public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
-		getStreamNode(vertexID).setSerializerOut(outType.createSerializer(executionConfig));
-	}
-
-	public <IN, OUT> void setOperator(Integer vertexID, StreamOperator<OUT> operatorObject) {
-		getStreamNode(vertexID).setOperator(operatorObject);
-	}
-
-	public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
-		getStreamNode(vertexID).setInputFormat(inputFormat);
-	}
-
-	public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
-		StreamNode node = getStreamNode(vertexID);
-		if (node == null) {
-			return;
-		}
-
-		switch (strategy) {
-		case ISOLATE:
-			node.isolateSlot();
-			break;
-		case NEWGROUP:
-			node.startNewSlotSharingGroup();
-			break;
-		default:
-			throw new IllegalArgumentException("Unknown resource strategy");
-		}
-	}
-
-	public StreamNode getStreamNode(Integer vertexID) {
-		return streamNodes.get(vertexID);
-	}
-
-	protected Collection<? extends Integer> getVertexIDs() {
-		return streamNodes.keySet();
-	}
-
-	public StreamEdge getStreamEdge(int sourceId, int targetId) {
-		Iterator<StreamEdge> outIterator = getStreamNode(sourceId).getOutEdges().iterator();
-		while (outIterator.hasNext()) {
-			StreamEdge edge = outIterator.next();
-
-			if (edge.getTargetId() == targetId) {
-				return edge;
-			}
-		}
-
-		throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
-	}
-
-	public Collection<Integer> getSourceIDs() {
-		return sources;
-	}
-
-
-	public Collection<Integer> getSinkIDs() {
-		return sinks;
-	}
-
-	public Collection<StreamNode> getStreamNodes() {
-		return streamNodes.values();
-	}
-
-	public Set<Tuple2<Integer, StreamOperator<?>>> getOperators() {
-		Set<Tuple2<Integer, StreamOperator<?>>> operatorSet = new HashSet<Tuple2<Integer, StreamOperator<?>>>();
-		for (StreamNode vertex : streamNodes.values()) {
-			operatorSet.add(new Tuple2<Integer, StreamOperator<?>>(vertex.getId(), vertex
-					.getOperator()));
-		}
-		return operatorSet;
-	}
-
-	public String getBrokerID(Integer vertexID) {
-		return vertexIDtoBrokerID.get(vertexID);
-	}
-
-	public long getLoopTimeout(Integer vertexID) {
-		return vertexIDtoLoopTimeout.get(vertexID);
-	}
-
-	public  Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
-
-		StreamNode source = this.addNode(sourceId,
-				StreamIterationHead.class,
-				null,
-				null);
-		sources.add(source.getId());
-		setParallelism(source.getId(), parallelism);
-
-		StreamNode sink = this.addNode(sinkId,
-				StreamIterationTail.class,
-				null,
-				null);
-		sinks.add(sink.getId());
-		setParallelism(sink.getId(), parallelism);
-
-		iterationSourceSinkPairs.add(new Tuple2<StreamNode, StreamNode>(source, sink));
-
-		source.setOperatorName("IterationSource-" + loopId);
-		sink.setOperatorName("IterationSink-" + loopId);
-		this.vertexIDtoBrokerID.put(source.getId(), "broker-" + loopId);
-		this.vertexIDtoBrokerID.put(sink.getId(), "broker-" + loopId);
-		this.vertexIDtoLoopTimeout.put(source.getId(), timeout);
-		this.vertexIDtoLoopTimeout.put(sink.getId(), timeout);
-
-		return new Tuple2<StreamNode, StreamNode>(source, sink);
-	}
-
-	public Set<Tuple2<StreamNode, StreamNode>> getIterationSourceSinkPairs() {
-		return iterationSourceSinkPairs;
-	}
-
-	protected void removeEdge(StreamEdge edge) {
-
-		edge.getSourceVertex().getOutEdges().remove(edge);
-		edge.getTargetVertex().getInEdges().remove(edge);
-
-	}
-
-	protected void removeVertex(StreamNode toRemove) {
-
-		Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
-
-		edgesToRemove.addAll(toRemove.getInEdges());
-		edgesToRemove.addAll(toRemove.getOutEdges());
-
-		for (StreamEdge edge : edgesToRemove) {
-			removeEdge(edge);
-		}
-		streamNodes.remove(toRemove.getId());
-	}
-
-	/**
-	 * Gets the assembled {@link JobGraph} and adds a default name for it.
-	 */
-	public JobGraph getJobGraph() {
-		return getJobGraph(jobName);
-	}
-
-	/**
-	 * Gets the assembled {@link JobGraph} and adds a user specified name for
-	 * it.
-	 * 
-	 * @param jobGraphName
-	 *            name of the jobGraph
-	 */
-	public JobGraph getJobGraph(String jobGraphName) {
-		// temporarily forbid checkpointing for iterative jobs
-		if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
-			throw new UnsupportedOperationException(
-					"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
-							+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
-							+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
-		}
-
-		setJobName(jobGraphName);
-
-		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
-
-		return jobgraphGenerator.createJobGraph(jobGraphName);
-	}
-
-	@Override
-	public String getStreamingPlanAsJSON() {
-
-		try {
-			return new JSONGenerator(this).getJSON();
-		} catch (JSONException e) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("JSON plan creation failed: {}", e);
-			}
-			return "";
-		}
-
-	}
-
-	@Override
-	public void dumpStreamingPlanAsJSON(File file) throws IOException {
-		PrintWriter pw = null;
-		try {
-			pw = new PrintWriter(new FileOutputStream(file), false);
-			pw.write(getStreamingPlanAsJSON());
-			pw.flush();
-
-		} finally {
-			if (pw != null) {
-				pw.close();
-			}
-		}
-	}
-
-	public static enum ResourceStrategy {
-		DEFAULT, ISOLATE, NEWGROUP
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
deleted file mode 100644
index 4a87eb3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.graph;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
-import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.SelectTransformation;
-import org.apache.flink.streaming.api.transformations.SinkTransformation;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.streaming.api.transformations.SplitTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
-import org.apache.flink.streaming.api.transformations.UnionTransformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A generator that generates a {@link StreamGraph} from a graph of
- * {@link StreamTransformation StreamTransformations}.
- *
- * <p>
- * This traverses the tree of {@code StreamTransformations} starting from the sinks. At each
- * transformation we recursively transform the inputs, then create a node in the {@code StreamGraph}
- * and add edges from the input Nodes to our newly created node. The transformation methods
- * return the IDs of the nodes in the StreamGraph that represent the input transformation. Several
- * IDs can be returned to be able to deal with feedback transformations and unions.
- *
- * <p>
- * Partitioning, split/select and union don't create actual nodes in the {@code StreamGraph}. For
- * these, we create a virtual node in the {@code StreamGraph} that holds the specific property, i.e.
- * partitioning, selector and so on. When an edge is created from a virtual node to a downstream
- * node the {@code StreamGraph} resolved the id of the original node and creates an edge
- * in the graph with the desired property. For example, if you have this graph:
- *
- * <pre>
- *     Map-1 -> HashPartition-2 -> Map-3
- * </pre>
- *
- * where the numbers represent transformation IDs. We first recurse all the way down. {@code Map-1}
- * is transformed, i.e. we create a {@code StreamNode} with ID 1. Then we transform the
- * {@code HashPartition}, for this, we create virtual node of ID 4 that holds the property
- * {@code HashPartition}. This transformation returns the ID 4. Then we transform the {@code Map-3}.
- * We add the edge {@code 4 -> 3}. The {@code StreamGraph} resolved the actual node with ID 1 and
- * creates and edge {@code 1 -> 3} with the property HashPartition.
- */
-public class StreamGraphGenerator {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
-
-	// The StreamGraph that is being built, this is initialized at the beginning.
-	private StreamGraph streamGraph;
-
-	private final StreamExecutionEnvironment env;
-
-	// This is used to assign a unique ID to iteration source/sink
-	protected static Integer iterationIdCounter = 0;
-	public static int getNewIterationNodeId() {
-		iterationIdCounter--;
-		return iterationIdCounter;
-	}
-
-	// Keep track of which Transforms we have already transformed, this is necessary because
-	// we have loops, i.e. feedback edges.
-	private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;
-
-
-	/**
-	 * Private constructor. The generator should only be invoked using {@link #generate}.
-	 */
-	private StreamGraphGenerator(StreamExecutionEnvironment env) {
-		this.streamGraph = new StreamGraph(env);
-		this.streamGraph.setChaining(env.isChainingEnabled());
-		
-		if (env.getCheckpointInterval() > 0) {
-			this.streamGraph.setCheckpointingEnabled(true);
-			this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval());
-			this.streamGraph.setCheckpointingMode(env.getCheckpointingMode());
-		}
-		this.streamGraph.setStateBackend(env.getStateBackend());
-		if (env.isForceCheckpointing()) {
-			this.streamGraph.forceCheckpoint();
-		}
-		
-		this.env = env;
-		this.alreadyTransformed = new HashMap<>();
-	}
-
-	/**
-	 * Generates a {@code StreamGraph} by traversing the graph of {@code StreamTransformations}
-	 * starting from the given transformations.
-	 *
-	 * @param env The {@code StreamExecutionEnvironment} that is used to set some parameters of the
-	 *            job
-	 * @param transformations The transformations starting from which to transform the graph
-	 *
-	 * @return The generated {@code StreamGraph}
-	 */
-	public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
-		return new StreamGraphGenerator(env).generateInternal(transformations);
-	}
-
-	/**
-	 * This starts the actual transformation, beginning from the sinks.
-	 */
-	private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
-		for (StreamTransformation<?> transformation: transformations) {
-			transform(transformation);
-		}
-		return streamGraph;
-	}
-
-	/**
-	 * Transforms one {@code StreamTransformation}.
-	 *
-	 * <p>
-	 * This checks whether we already transformed it and exits early in that case. If not it
-	 * delegates to one of the transformation specific methods.
-	 */
-	private Collection<Integer> transform(StreamTransformation<?> transform) {
-
-		if (alreadyTransformed.containsKey(transform)) {
-			return alreadyTransformed.get(transform);
-		}
-
-		LOG.debug("Transforming " + transform);
-
-		// call at least once to trigger exceptions about MissingTypeInfo
-		transform.getOutputType();
-
-		Collection<Integer> transformedIds;
-		if (transform instanceof OneInputTransformation<?, ?>) {
-			transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
-		} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
-			transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
-		} else if (transform instanceof SourceTransformation<?>) {
-			transformedIds = transformSource((SourceTransformation<?>) transform);
-		} else if (transform instanceof SinkTransformation<?>) {
-			transformedIds = transformSink((SinkTransformation<?>) transform);
-		} else if (transform instanceof UnionTransformation<?>) {
-			transformedIds = transformUnion((UnionTransformation<?>) transform);
-		} else if (transform instanceof SplitTransformation<?>) {
-			transformedIds = transformSplit((SplitTransformation<?>) transform);
-		} else if (transform instanceof SelectTransformation<?>) {
-			transformedIds = transformSelect((SelectTransformation<?>) transform);
-		} else if (transform instanceof FeedbackTransformation<?>) {
-			transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
-		} else if (transform instanceof CoFeedbackTransformation<?>) {
-			transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
-		} else if (transform instanceof PartitionTransformation<?>) {
-			transformedIds = transformPartition((PartitionTransformation<?>) transform);
-		} else {
-			throw new IllegalStateException("Unknown transformation: " + transform);
-		}
-
-		// need this check because the iterate transformation adds itself before
-		// transforming the feedback edges
-		if (!alreadyTransformed.containsKey(transform)) {
-			alreadyTransformed.put(transform, transformedIds);
-		}
-
-		if (transform.getBufferTimeout() > 0) {
-			streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
-		}
-		if (transform.getResourceStrategy() != StreamGraph.ResourceStrategy.DEFAULT) {
-			streamGraph.setResourceStrategy(transform.getId(), transform.getResourceStrategy());
-		}
-
-		return transformedIds;
-	}
-
-	/**
-	 * Transforms a {@code UnionTransformation}.
-	 *
-	 * <p>
-	 * This is easy, we only have to transform the inputs and return all the IDs in a list so
-	 * that downstream operations can connect to all upstream nodes.
-	 */
-	private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {
-		List<StreamTransformation<T>> inputs = union.getInputs();
-		List<Integer> resultIds = new ArrayList<>();
-
-		for (StreamTransformation<T> input: inputs) {
-			resultIds.addAll(transform(input));
-		}
-
-		return resultIds;
-	}
-
-	/**
-	 * Transforms a {@code PartitionTransformation}.
-	 *
-	 * <p>
-	 * For this we create a virtual node in the {@code StreamGraph} that holds the partition
-	 * property. @see StreamGraphGenerator
-	 */
-	private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
-		StreamTransformation<T> input = partition.getInput();
-		List<Integer> resultIds = new ArrayList<>();
-
-		Collection<Integer> transformedIds = transform(input);
-		for (Integer transformedId: transformedIds) {
-			int virtualId = StreamTransformation.getNewNodeId();
-			streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
-			resultIds.add(virtualId);
-		}
-
-		return resultIds;
-	}
-
-	/**
-	 * Transforms a {@code SplitTransformation}.
-	 *
-	 * <p>
-	 * We add the output selector to previously transformed nodes.
-	 */
-	private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
-
-		StreamTransformation<T> input = split.getInput();
-		Collection<Integer> resultIds = transform(input);
-
-		// the recursive transform call might have transformed this already
-		if (alreadyTransformed.containsKey(split)) {
-			return alreadyTransformed.get(split);
-		}
-
-		for (int inputId : resultIds) {
-			streamGraph.addOutputSelector(inputId, split.getOutputSelector());
-		}
-
-
-		return resultIds;
-	}
-
-	/**
-	 * Transforms a {@code SelectTransformation}.
-	 *
-	 * <p>
-	 * For this we create a virtual node in the {@code StreamGraph} holds the selected names.
-	 * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
-	 */
-	private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
-		StreamTransformation<T> input = select.getInput();
-		Collection<Integer> resultIds = transform(input);
-
-
-		// the recursive transform might have already transformed this
-		if (alreadyTransformed.containsKey(select)) {
-			return alreadyTransformed.get(select);
-		}
-
-		List<Integer> virtualResultIds = new ArrayList<>();
-
-		for (int inputId : resultIds) {
-			int virtualId = StreamTransformation.getNewNodeId();
-			streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
-			virtualResultIds.add(virtualId);
-		}
-		return virtualResultIds;
-	}
-
-	/**
-	 * Transforms a {@code FeedbackTransformation}.
-	 *
-	 * <p>
-	 * This will recursively transform the input and the feedback edges. We return the concatenation
-	 * of the input IDs and the feedback IDs so that downstream operations can be wired to both.
-	 *
-	 * <p>
-	 * This is responsible for creating the IterationSource and IterationSink which
-	 * are used to feed back the elements.
-	 */
-	private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
-
-		if (iterate.getFeedbackEdges().size() <= 0) {
-			throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
-		}
-
-		StreamTransformation<T> input = iterate.getInput();
-		List<Integer> resultIds = new ArrayList<>();
-
-		// first transform the input stream(s) and store the result IDs
-		resultIds.addAll(transform(input));
-
-		// the recursive transform might have already transformed this
-		if (alreadyTransformed.containsKey(iterate)) {
-			return alreadyTransformed.get(iterate);
-		}
-
-
-		// create the fake iteration source/sink pair
-		Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
-				iterate.getId(),
-				getNewIterationNodeId(),
-				getNewIterationNodeId(),
-				iterate.getWaitTime(),
-				iterate.getParallelism());
-
-		StreamNode itSource = itSourceAndSink.f0;
-		StreamNode itSink = itSourceAndSink.f1;
-
-		// We set the proper serializers for the sink/source
-		streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(env.getConfig()));
-		streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(env.getConfig()), null, null);
-
-		// also add the feedback source ID to the result IDs, so that downstream operators will
-		// add both as input
-		resultIds.add(itSource.getId());
-
-		// at the iterate to the already-seen-set with the result IDs, so that we can transform
-		// the feedback edges and let them stop when encountering the iterate node
-		alreadyTransformed.put(iterate, resultIds);
-
-		for (StreamTransformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
-			Collection<Integer> feedbackIds = transform(feedbackEdge);
-			for (Integer feedbackId: feedbackIds) {
-				streamGraph.addEdge(feedbackId,
-						itSink.getId(),
-						0
-				);
-			}
-		}
-
-		return resultIds;
-	}
-
-	/**
-	 * Transforms a {@code CoFeedbackTransformation}.
-	 *
-	 * <p>
-	 * This will only transform feedback edges, the result of this transform will be wired
-	 * to the second input of a Co-Transform. The original input is wired directly to the first
-	 * input of the downstream Co-Transform.
-	 *
-	 * <p>
-	 * This is responsible for creating the IterationSource and IterationSink which
-	 * are used to feed back the elements.
-	 */
-	private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {
-
-		// For Co-Iteration we don't need to transform the input and wire the input to the
-		// head operator by returning the input IDs, the input is directly wired to the left
-		// input of the co-operation. This transform only needs to return the ids of the feedback
-		// edges, since they need to be wired to the second input of the co-operation.
-
-		// create the fake iteration source/sink pair
-		Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
-				coIterate.getId(),
-				getNewIterationNodeId(),
-				getNewIterationNodeId(),
-				coIterate.getWaitTime(),
-				coIterate.getParallelism());
-
-		StreamNode itSource = itSourceAndSink.f0;
-		StreamNode itSink = itSourceAndSink.f1;
-
-		// We set the proper serializers for the sink/source
-		streamGraph.setSerializers(itSource.getId(), null, null, coIterate.getOutputType().createSerializer(env.getConfig()));
-		streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(env.getConfig()), null, null);
-
-		Collection<Integer> resultIds = Collections.singleton(itSource.getId());
-
-		// at the iterate to the already-seen-set with the result IDs, so that we can transform
-		// the feedback edges and let them stop when encountering the iterate node
-		alreadyTransformed.put(coIterate, resultIds);
-
-		for (StreamTransformation<F> feedbackEdge : coIterate.getFeedbackEdges()) {
-			Collection<Integer> feedbackIds = transform(feedbackEdge);
-			for (Integer feedbackId: feedbackIds) {
-				streamGraph.addEdge(feedbackId,
-						itSink.getId(),
-						0
-				);
-			}
-		}
-
-		return Collections.singleton(itSource.getId());
-	}
-
-	/**
-	 * Transforms a {@code SourceTransformation}.
-	 */
-	private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
-		streamGraph.addSource(source.getId(),
-				source.getOperator(),
-				null,
-				source.getOutputType(),
-				"Source: " + source.getName());
-		if (source.getOperator().getUserFunction() instanceof FileSourceFunction) {
-			FileSourceFunction<T> fs = (FileSourceFunction<T>) source.getOperator().getUserFunction();
-			streamGraph.setInputFormat(source.getId(), fs.getFormat());
-		}
-		streamGraph.setParallelism(source.getId(), source.getParallelism());
-		return Collections.singleton(source.getId());
-	}
-
-	/**
-	 * Transforms a {@code SourceTransformation}.
-	 */
-	private <T> Collection<Integer> transformSink(SinkTransformation<T> sink) {
-
-		Collection<Integer> inputIds = transform(sink.getInput());
-
-		streamGraph.addSink(sink.getId(),
-				sink.getOperator(),
-				sink.getInput().getOutputType(),
-				null,
-				"Sink: " + sink.getName());
-
-		streamGraph.setParallelism(sink.getId(), sink.getParallelism());
-
-		for (Integer inputId: inputIds) {
-			streamGraph.addEdge(inputId,
-					sink.getId(),
-					0
-			);
-		}
-
-
-		if (sink.getStateKeySelector() != null) {
-			TypeSerializer<?> keySerializer = sink.getStateKeyType().createSerializer(env.getConfig());
-			streamGraph.setKey(sink.getId(), sink.getStateKeySelector(), keySerializer);
-		}
-
-		return Collections.emptyList();
-	}
-
-	/**
-	 * Transforms a {@code OneInputTransformation}.
-	 *
-	 * <p>
-	 * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
-	 * wired the inputs to this new node.
-	 */
-	private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
-
-		Collection<Integer> inputIds = transform(transform.getInput());
-
-		// the recursive call might have already transformed this
-		if (alreadyTransformed.containsKey(transform)) {
-			return alreadyTransformed.get(transform);
-		}
-
-		streamGraph.addOperator(transform.getId(),
-				transform.getOperator(),
-				transform.getInputType(),
-				transform.getOutputType(),
-				transform.getName());
-
-		if (transform.getStateKeySelector() != null) {
-			TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
-			streamGraph.setKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
-		}
-		if (transform.getStateKeyType() != null) {
-			
-		}
-
-		streamGraph.setParallelism(transform.getId(), transform.getParallelism());
-
-		for (Integer inputId: inputIds) {
-			streamGraph.addEdge(inputId, transform.getId(), 0);
-		}
-
-		return Collections.singleton(transform.getId());
-	}
-
-	/**
-	 * Transforms a {@code TwoInputTransformation}.
-	 *
-	 * <p>
-	 * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
-	 * wired the inputs to this new node.
-	 */
-	private <IN1, IN2, OUT> Collection<Integer> transformTwoInputTransform(TwoInputTransformation<IN1, IN2, OUT> transform) {
-
-		Collection<Integer> inputIds1 = transform(transform.getInput1());
-		Collection<Integer> inputIds2 = transform(transform.getInput2());
-
-		// the recursive call might have already transformed this
-		if (alreadyTransformed.containsKey(transform)) {
-			return alreadyTransformed.get(transform);
-		}
-
-		streamGraph.addCoOperator(
-				transform.getId(),
-				transform.getOperator(),
-				transform.getInputType1(),
-				transform.getInputType2(),
-				transform.getOutputType(),
-				transform.getName());
-
-		streamGraph.setParallelism(transform.getId(), transform.getParallelism());
-
-		for (Integer inputId: inputIds1) {
-			streamGraph.addEdge(inputId,
-					transform.getId(),
-					1
-			);
-		}
-
-		for (Integer inputId: inputIds2) {
-			streamGraph.addEdge(inputId,
-					transform.getId(),
-					2
-			);
-		}
-
-		return Collections.singleton(transform.getId());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
deleted file mode 100644
index 608e648..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-
-/**
- * Class representing the operators in the streaming programs, with all their
- * properties.
- * 
- */
-public class StreamNode implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	private static int currentSlotSharingIndex = 1;
-
-	transient private StreamExecutionEnvironment env;
-
-	private Integer id;
-	private Integer parallelism = null;
-	private Long bufferTimeout = null;
-	private String operatorName;
-	private Integer slotSharingID;
-	private boolean isolatedSlot = false;
-	private KeySelector<?,?> statePartitioner;
-	private TypeSerializer<?> stateKeySerializer;
-
-	private transient StreamOperator<?> operator;
-	private List<OutputSelector<?>> outputSelectors;
-	private TypeSerializer<?> typeSerializerIn1;
-	private TypeSerializer<?> typeSerializerIn2;
-	private TypeSerializer<?> typeSerializerOut;
-
-	private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
-	private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
-
-	private Class<? extends AbstractInvokable> jobVertexClass;
-
-	private InputFormat<?, ?> inputFormat;
-
-	public StreamNode(StreamExecutionEnvironment env, Integer id, StreamOperator<?> operator,
-			String operatorName, List<OutputSelector<?>> outputSelector,
-			Class<? extends AbstractInvokable> jobVertexClass) {
-		this.env = env;
-		this.id = id;
-		this.operatorName = operatorName;
-		this.operator = operator;
-		this.outputSelectors = outputSelector;
-		this.jobVertexClass = jobVertexClass;
-		this.slotSharingID = currentSlotSharingIndex;
-	}
-
-	public void addInEdge(StreamEdge inEdge) {
-		if (inEdge.getTargetId() != getId()) {
-			throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");
-		} else {
-			inEdges.add(inEdge);
-		}
-	}
-
-	public void addOutEdge(StreamEdge outEdge) {
-		if (outEdge.getSourceId() != getId()) {
-			throw new IllegalArgumentException("Source id doesn't match the StreamNode id");
-		} else {
-			outEdges.add(outEdge);
-		}
-	}
-
-	public List<StreamEdge> getOutEdges() {
-		return outEdges;
-	}
-
-	public List<StreamEdge> getInEdges() {
-		return inEdges;
-	}
-
-	public List<Integer> getOutEdgeIndices() {
-		List<Integer> outEdgeIndices = new ArrayList<Integer>();
-
-		for (StreamEdge edge : outEdges) {
-			outEdgeIndices.add(edge.getTargetId());
-		}
-
-		return outEdgeIndices;
-	}
-
-	public List<Integer> getInEdgeIndices() {
-		List<Integer> inEdgeIndices = new ArrayList<Integer>();
-
-		for (StreamEdge edge : inEdges) {
-			inEdgeIndices.add(edge.getSourceId());
-		}
-
-		return inEdgeIndices;
-	}
-
-	public Integer getId() {
-		return id;
-	}
-
-	public int getParallelism() {
-		if (parallelism == -1) {
-			return env.getParallelism();
-		} else {
-			return parallelism;
-		}
-	}
-
-	public void setParallelism(Integer parallelism) {
-		this.parallelism = parallelism;
-	}
-
-	public Long getBufferTimeout() {
-		return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
-	}
-
-	public void setBufferTimeout(Long bufferTimeout) {
-		this.bufferTimeout = bufferTimeout;
-	}
-
-	public StreamOperator<?> getOperator() {
-		return operator;
-	}
-
-	public void setOperator(StreamOperator<?> operator) {
-		this.operator = operator;
-	}
-
-	public String getOperatorName() {
-		return operatorName;
-	}
-
-	public void setOperatorName(String operatorName) {
-		this.operatorName = operatorName;
-	}
-
-	public List<OutputSelector<?>> getOutputSelectors() {
-		return outputSelectors;
-	}
-
-	public OutputSelectorWrapper<?> getOutputSelectorWrapper() {
-		return OutputSelectorWrapperFactory.create(getOutputSelectors());
-	}
-
-	public void addOutputSelector(OutputSelector<?> outputSelector) {
-		this.outputSelectors.add(outputSelector);
-	}
-
-	public TypeSerializer<?> getTypeSerializerIn1() {
-		return typeSerializerIn1;
-	}
-
-	public void setSerializerIn1(TypeSerializer<?> typeSerializerIn1) {
-		this.typeSerializerIn1 = typeSerializerIn1;
-	}
-
-	public TypeSerializer<?> getTypeSerializerIn2() {
-		return typeSerializerIn2;
-	}
-
-	public void setSerializerIn2(TypeSerializer<?> typeSerializerIn2) {
-		this.typeSerializerIn2 = typeSerializerIn2;
-	}
-
-	public TypeSerializer<?> getTypeSerializerOut() {
-		return typeSerializerOut;
-	}
-
-	public void setSerializerOut(TypeSerializer<?> typeSerializerOut) {
-		this.typeSerializerOut = typeSerializerOut;
-	}
-
-	public Class<? extends AbstractInvokable> getJobVertexClass() {
-		return jobVertexClass;
-	}
-
-	public InputFormat<?, ?> getInputFormat() {
-		return inputFormat;
-	}
-
-	public void setInputFormat(InputFormat<?, ?> inputFormat) {
-		this.inputFormat = inputFormat;
-	}
-
-	public int getSlotSharingID() {
-		return isolatedSlot ? -1 : slotSharingID;
-	}
-
-	public void startNewSlotSharingGroup() {
-		this.slotSharingID = ++currentSlotSharingIndex;
-	}
-
-	public void isolateSlot() {
-		isolatedSlot = true;
-	}
-	
-	@Override
-	public String toString() {
-		return operatorName + "-" + id;
-	}
-
-	public KeySelector<?, ?> getStatePartitioner() {
-		return statePartitioner;
-	}
-
-	public void setStatePartitioner(KeySelector<?, ?> statePartitioner) {
-		this.statePartitioner = statePartitioner;
-	}
-
-	public TypeSerializer<?> getStateKeySerializer() {
-		return stateKeySerializer;
-	}
-
-	public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {
-		this.stateKeySerializer = stateKeySerializer;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		StreamNode that = (StreamNode) o;
-
-		return id.equals(that.id);
-	}
-
-	@Override
-	public int hashCode() {
-		return id.hashCode();
-	}
-}