You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:55 UTC

[14/19] flink git commit: [streaming] Major internal renaming and restructure

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
new file mode 100644
index 0000000..9fa4d94
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+
+public class FileSourceFunction extends RichParallelSourceFunction<String> {
+	private static final long serialVersionUID = 1L;
+
+	private InputSplitProvider provider;
+
+	private InputFormat<String, ?> inputFormat;
+
+	private TypeInformation<String> typeInfo;
+
+	private volatile boolean isRunning;
+
+	public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
+		this.inputFormat = format;
+		this.typeInfo = typeInfo;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+		this.provider = context.getInputSplitProvider();
+		inputFormat.configure(context.getTaskStubParameters());
+	}
+
+	@Override
+	public void run(Collector<String> collector) throws Exception {
+		isRunning = true;
+		final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext()
+				.getExecutionConfig());
+		final Iterator<InputSplit> splitIterator = getInputSplits();
+		@SuppressWarnings("unchecked")
+		final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
+		try {
+			while (isRunning && splitIterator.hasNext()) {
+
+				final InputSplit split = splitIterator.next();
+				String record = serializer.createInstance();
+
+				format.open(split);
+				while (isRunning && !format.reachedEnd()) {
+					if ((record = format.nextRecord(record)) != null) {
+						collector.collect(record);
+					}
+				}
+
+			}
+			collector.close();
+		} finally {
+			format.close();
+		}
+		isRunning = false;
+	}
+
+	private Iterator<InputSplit> getInputSplits() {
+
+		return new Iterator<InputSplit>() {
+
+			private InputSplit nextSplit;
+
+			private boolean exhausted;
+
+			@Override
+			public boolean hasNext() {
+				if (exhausted) {
+					return false;
+				}
+
+				if (nextSplit != null) {
+					return true;
+				}
+
+				InputSplit split = provider.getNextInputSplit();
+
+				if (split != null) {
+					this.nextSplit = split;
+					return true;
+				} else {
+					exhausted = true;
+					return false;
+				}
+			}
+
+			@Override
+			public InputSplit next() {
+				if (this.nextSplit == null && !hasNext()) {
+					throw new NoSuchElementException();
+				}
+
+				final InputSplit tmp = this.nextSplit;
+				this.nextSplit = null;
+				return tmp;
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		};
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
new file mode 100644
index 0000000..5bc1eb0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.util.Collector;
+
+public class FromElementsFunction<T> implements SourceFunction<T> {
+	private static final long serialVersionUID = 1L;
+
+	Iterable<T> iterable;
+
+	private volatile boolean isRunning;
+
+	public FromElementsFunction(T... elements) {
+		this.iterable = Arrays.asList(elements);
+	}
+
+	public FromElementsFunction(Collection<T> elements) {
+		this.iterable = elements;
+	}
+
+	public FromElementsFunction(Iterable<T> elements) {
+		this.iterable = elements;
+	}
+
+	@Override
+	public void run(Collector<T> collector) throws Exception {
+		isRunning = true;
+		for (T element : iterable) {
+			if (isRunning) {
+				collector.collect(element);
+			} else {
+				break;
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
new file mode 100644
index 0000000..4878c1b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NumberSequenceIterator;
+
+/**
+ * Source Function used to generate the number sequence
+ * 
+ */
+public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
+
+	private static final long serialVersionUID = 1L;
+
+	private NumberSequenceIterator fullIterator;
+	private NumberSequenceIterator splitIterator;
+
+	private volatile boolean isRunning;
+
+	public GenSequenceFunction(long from, long to) {
+		fullIterator = new NumberSequenceIterator(from, to);
+	}
+
+	@Override
+	public void run(Collector<Long> collector) throws Exception {
+		isRunning = true;
+		while (splitIterator.hasNext() && isRunning) {
+			collector.collect(splitIterator.next());
+		}
+	}
+
+	@Override
+	public void open(Configuration config) {
+		int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
+		int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+		splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
new file mode 100644
index 0000000..72a18d0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public interface GenericSourceFunction<T> {
+
+	public TypeInformation<T> getType();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
new file mode 100644
index 0000000..c39a372
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * A stream data source that is executed in parallel. Upon execution, the runtime will
+ * execute as many parallel instances of this function function as configured parallelism
+ * of the source.
+ *
+ * <p>This interface acts only as a marker to tell the system that this source may
+ * be executed in parallel. When different parallel instances are required to perform
+ * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
+ * context, which revels information like the number of parallel tasks, and which parallel
+ * task the current instance is.
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
new file mode 100644
index 0000000..fcbcbce
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Base class for implementing a data source that has access to context information
+ * (via {@link #getRuntimeContext()}) and additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
+		implements ParallelSourceFunction<OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
new file mode 100644
index 0000000..dd08b2a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Base class for implementing a parallel data source that has access to context information
+ * (via {@link #getRuntimeContext()}) and additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ *
+ * <p>This class is useful when implementing parallel sources where different parallel subtasks
+ * need to perform different work. Typical patterns for that are:
+ * <ul>
+ *     <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
+ *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
+ *         to determine the current parallelism. It is strongly encouraged to use this method, rather than
+ *         hard-wiring the parallelism, because the configured parallelism may change depending on
+ *         program configuration. The parallelism may also change after recovering failures, when fewer than
+ *         desired parallel worker as available.</li>
+ *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
+ *         determine which subtask the current instance of the function executes.</li>
+ * </ul>
+ * </p>
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
new file mode 100644
index 0000000..b36ae39
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketTextStreamFunction extends RichSourceFunction<String> {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
+
+	private static final long serialVersionUID = 1L;
+
+	private String hostname;
+	private int port;
+	private char delimiter;
+	private long maxRetry;
+	private boolean retryForever;
+	private Socket socket;
+	private static final int CONNECTION_TIMEOUT_TIME = 0;
+	private static final int CONNECTION_RETRY_SLEEP = 1000;
+
+	private volatile boolean isRunning = false;
+
+	public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
+		this.hostname = hostname;
+		this.port = port;
+		this.delimiter = delimiter;
+		this.maxRetry = maxRetry;
+		this.retryForever = maxRetry < 0;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		socket = new Socket();
+		socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
+	}
+
+	@Override
+	public void run(Collector<String> collector) throws Exception {
+		streamFromSocket(collector, socket);
+	}
+
+	public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
+		isRunning = true;
+		try {
+			StringBuffer buffer = new StringBuffer();
+			BufferedReader reader = new BufferedReader(new InputStreamReader(
+					socket.getInputStream()));
+
+			while (isRunning) {
+				int data;
+				try {
+					data = reader.read();
+				} catch (SocketException e) {
+					if (!isRunning) {
+						break;
+					} else {
+						throw e;
+					}
+				}
+
+				if (data == -1) {
+					socket.close();
+					long retry = 0;
+					boolean success = false;
+					while (retry < maxRetry && !success) {
+						if (!retryForever) {
+							retry++;
+						}
+						LOG.warn("Lost connection to server socket. Retrying in "
+								+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
+						try {
+							socket = new Socket();
+							socket.connect(new InetSocketAddress(hostname, port),
+									CONNECTION_TIMEOUT_TIME);
+							success = true;
+						} catch (ConnectException ce) {
+							Thread.sleep(CONNECTION_RETRY_SLEEP);
+						}
+					}
+
+					if (success) {
+						LOG.info("Server socket is reconnected.");
+					} else {
+						LOG.error("Could not reconnect to server socket.");
+						break;
+					}
+					reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+					continue;
+				}
+
+				if (data == delimiter) {
+					collector.collect(buffer.toString());
+					buffer = new StringBuffer();
+				} else if (data != '\r') { // ignore carriage return
+					buffer.append((char) data);
+				}
+			}
+
+			if (buffer.length() > 0) {
+				collector.collect(buffer.toString());
+			}
+		} finally {
+			socket.close();
+		}
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+		if (socket != null && !socket.isClosed()) {
+			try {
+				socket.close();
+			} catch (IOException e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Could not close open socket");
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
new file mode 100644
index 0000000..9f0602f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface for a stream data source.
+ *
+ * <p>Sources implementing this specific interface are executed with
+ * parallelism 1. To execute your sources in parallel
+ * see {@link ParallelSourceFunction}.</p>
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public interface SourceFunction<OUT> extends Function, Serializable {
+
+	/**
+	 * Main work method of the source. This function is invoked at the beginning of the
+	 * source's life and is expected to produce its data py "pushing" the records into
+	 * the given collector.
+	 *
+	 * @param collector The collector that forwards records to the source's consumers.
+	 *
+	 * @throws Exception Throwing any type of exception will cause the source to be considered
+	 *                   failed. When fault tolerance is enabled, recovery will be triggered,
+	 *                   which may create a new instance of this source.
+	 */
+	public void run(Collector<OUT> collector) throws Exception;
+
+	/**
+	 * This method signals the source function to cancel its operation
+	 * The method is called by the framework if the task is to be aborted prematurely.
+	 * This happens when the user cancels the job, or when the task is canceled as
+	 * part of a program failure and cleanup.
+	 */
+	public void cancel();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
new file mode 100644
index 0000000..8d8ded9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+public class JSONGenerator {
+
+	public static final String STEPS = "step_function";
+	public static final String ID = "id";
+	public static final String SIDE = "side";
+	public static final String SHIP_STRATEGY = "ship_strategy";
+	public static final String PREDECESSORS = "predecessors";
+	public static final String TYPE = "type";
+	public static final String PACT = "pact";
+	public static final String CONTENTS = "contents";
+	public static final String PARALLELISM = "parallelism";
+
+	private StreamGraph streamGraph;
+
+	public JSONGenerator(StreamGraph streamGraph) {
+		this.streamGraph = streamGraph;
+	}
+
+	public String getJSON() throws JSONException {
+		JSONObject json = new JSONObject();
+		JSONArray nodes = new JSONArray();
+		json.put("nodes", nodes);
+		List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
+		Collections.sort(operatorIDs);
+		visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
+		return json.toString();
+	}
+
+	private void visit(JSONArray jsonArray, List<Integer> toVisit,
+			Map<Integer, Integer> edgeRemapings) throws JSONException {
+
+		Integer vertexID = toVisit.get(0);
+		StreamNode vertex = streamGraph.getVertex(vertexID);
+
+		if (streamGraph.getSourceIDs().contains(vertexID)
+				|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
+
+			JSONObject node = new JSONObject();
+			decorateNode(vertexID, node);
+
+			if (!streamGraph.getSourceIDs().contains(vertexID)) {
+				JSONArray inputs = new JSONArray();
+				node.put(PREDECESSORS, inputs);
+
+				for (StreamEdge inEdge : vertex.getInEdges()) {
+					int inputID = inEdge.getSourceID();
+
+					Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ? edgeRemapings
+							.get(inputID) : inputID;
+					decorateEdge(inputs, vertexID, mappedID, inputID);
+				}
+			}
+			jsonArray.put(node);
+			toVisit.remove(vertexID);
+		} else {
+			Integer iterationHead = -1;
+			for (StreamEdge inEdge : vertex.getInEdges()) {
+				int operator = inEdge.getSourceID();
+
+				if (streamGraph.vertexIDtoLoop.containsKey(operator)) {
+					iterationHead = operator;
+				}
+			}
+
+			JSONObject obj = new JSONObject();
+			JSONArray iterationSteps = new JSONArray();
+			obj.put(STEPS, iterationSteps);
+			obj.put(ID, iterationHead);
+			obj.put(PACT, "IterativeDataStream");
+			obj.put(PARALLELISM, streamGraph.getVertex(iterationHead).getParallelism());
+			obj.put(CONTENTS, "Stream Iteration");
+			JSONArray iterationInputs = new JSONArray();
+			obj.put(PREDECESSORS, iterationInputs);
+			toVisit.remove(iterationHead);
+			visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
+			jsonArray.put(obj);
+		}
+
+		if (!toVisit.isEmpty()) {
+			visit(jsonArray, toVisit, edgeRemapings);
+		}
+	}
+
+	private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int headId,
+			Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException {
+
+		Integer vertexID = toVisit.get(0);
+		StreamNode vertex = streamGraph.getVertex(vertexID);
+		toVisit.remove(vertexID);
+
+		// Ignoring head and tail to avoid redundancy
+		if (!streamGraph.vertexIDtoLoop.containsKey(vertexID)) {
+			JSONObject obj = new JSONObject();
+			jsonArray.put(obj);
+			decorateNode(vertexID, obj);
+			JSONArray inEdges = new JSONArray();
+			obj.put(PREDECESSORS, inEdges);
+
+			for (StreamEdge inEdge : vertex.getInEdges()) {
+				int inputID = inEdge.getSourceID();
+
+				if (edgeRemapings.keySet().contains(inputID)) {
+					decorateEdge(inEdges, vertexID, inputID, inputID);
+				} else if (!streamGraph.vertexIDtoLoop.containsKey(inputID)) {
+					decorateEdge(iterationInEdges, vertexID, inputID, inputID);
+				}
+			}
+
+			edgeRemapings.put(vertexID, headId);
+			visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
+		}
+
+	}
+
+	private void decorateEdge(JSONArray inputArray, int vertexID, int mappedInputID, int inputID)
+			throws JSONException {
+		JSONObject input = new JSONObject();
+		inputArray.put(input);
+		input.put(ID, mappedInputID);
+		input.put(SHIP_STRATEGY, streamGraph.getEdge(inputID, vertexID).getPartitioner()
+				.getStrategy());
+		input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
+	}
+
+	private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {
+
+		StreamNode vertex = streamGraph.getVertex(vertexID);
+
+		node.put(ID, vertexID);
+		node.put(TYPE, vertex.getOperatorName());
+
+		if (streamGraph.getSourceIDs().contains(vertexID)) {
+			node.put(PACT, "Data Source");
+		} else {
+			node.put(PACT, "Data Stream");
+		}
+
+		StreamOperator<?, ?> operator = streamGraph.getVertex(vertexID).getOperator();
+
+		if (operator != null && operator.getUserFunction() != null) {
+			node.put(CONTENTS, vertex.getOperatorName() + " at "
+					+ operator.getUserFunction().getClass().getSimpleName());
+		} else {
+			node.put(CONTENTS, vertex.getOperatorName());
+		}
+
+		node.put(PARALLELISM, streamGraph.getVertex(vertexID).getParallelism());
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
new file mode 100644
index 0000000..f5e771a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
+import org.apache.flink.util.InstantiationUtil;
+
+public class StreamConfig implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
+	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
+	private static final String CHAINED_OUTPUTS = "chainedOutputs";
+	private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
+	private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
+	private static final String OUTPUT_NAME = "outputName_";
+	private static final String VERTEX_NAME = "vertexID";
+	private static final String OPERATOR_NAME = "operatorName";
+	private static final String ITERATION_ID = "iterationId";
+	private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
+	private static final String SERIALIZEDUDF = "serializedUDF";
+	private static final String USER_FUNCTION = "userFunction";
+	private static final String BUFFER_TIMEOUT = "bufferTimeout";
+	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
+	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
+	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
+	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
+	private static final String ITERATON_WAIT = "iterationWait";
+	private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
+	private static final String EDGES_IN_ORDER = "edgesInOrder";
+	private static final String OUT_STREAM_EDGES = "outStreamEdges";
+	private static final String IN_STREAM_EDGES = "inStreamEdges";
+
+	// DEFAULT VALUES
+	private static final long DEFAULT_TIMEOUT = 100;
+	public static final String STATE_MONITORING = "STATE_MONITORING";
+
+	// CONFIG METHODS
+
+	private Configuration config;
+
+	public StreamConfig(Configuration config) {
+		this.config = config;
+	}
+
+	public Configuration getConfiguration() {
+		return config;
+	}
+
+	public void setVertexID(Integer vertexID) {
+		config.setInteger(VERTEX_NAME, vertexID);
+	}
+
+	public Integer getVertexID() {
+		return config.getInteger(VERTEX_NAME, -1);
+	}
+
+	public void setOperatorName(String name) {
+		config.setString(OPERATOR_NAME, name);
+	}
+
+	public String getOperatorName() {
+		return config.getString(OPERATOR_NAME, "Missing");
+	}
+
+	public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
+	}
+
+	public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
+	}
+
+	public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
+	}
+
+	public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_IN_1, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_IN_2, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_OUT_1, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_OUT_2, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
+	}
+
+	private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) {
+		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
+	}
+
+	public void setBufferTimeout(long timeout) {
+		config.setLong(BUFFER_TIMEOUT, timeout);
+	}
+
+	public long getBufferTimeout() {
+		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
+	}
+
+	public void setStreamOperator(StreamOperator<?, ?> operator) {
+		if (operator != null) {
+			config.setClass(USER_FUNCTION, operator.getClass());
+
+			try {
+				config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(operator));
+			} catch (SerializationException e) {
+				throw new RuntimeException("Cannot serialize operator object "
+						+ operator.getClass(), e);
+			}
+		}
+	}
+
+	@SuppressWarnings({ "unchecked" })
+	public <T> T getStreamOperator(ClassLoader cl) {
+		try {
+			return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Cannot instantiate user function", e);
+		}
+	}
+
+	public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) {
+		try {
+			config.setBytes(OUTPUT_SELECTOR_WRAPPER, SerializationUtils.serialize(outputSelectorWrapper));
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize OutputSelectorWrapper");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
+		try {
+			return (OutputSelectorWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					OUTPUT_SELECTOR_WRAPPER, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper", e);
+		}
+	}
+
+	public void setIterationId(Integer iterationId) {
+		config.setInteger(ITERATION_ID, iterationId);
+	}
+
+	public Integer getIterationId() {
+		return config.getInteger(ITERATION_ID, 0);
+	}
+
+	public void setIterationWaitTime(long time) {
+		config.setLong(ITERATON_WAIT, time);
+	}
+
+	public long getIterationWaitTime() {
+		return config.getLong(ITERATON_WAIT, 0);
+	}
+
+	public void setSelectedNames(Integer output, List<String> selected) {
+		if (selected != null) {
+			config.setBytes(OUTPUT_NAME + output,
+					SerializationUtils.serialize((Serializable) selected));
+		} else {
+			config.setBytes(OUTPUT_NAME + output,
+					SerializationUtils.serialize(new ArrayList<String>()));
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<String> getSelectedNames(Integer output) {
+		return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
+				null));
+	}
+
+	public void setNumberOfInputs(int numberOfInputs) {
+		config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
+	}
+
+	public int getNumberOfInputs() {
+		return config.getInteger(NUMBER_OF_INPUTS, 0);
+	}
+
+	public void setNumberOfOutputs(int numberOfOutputs) {
+		config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
+	}
+
+	public int getNumberOfOutputs() {
+		return config.getInteger(NUMBER_OF_OUTPUTS, 0);
+	}
+
+	public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
+		config.setBytes(NONCHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
+		try {
+			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate outputs.");
+		}
+	}
+
+	public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
+		config.setBytes(CHAINED_OUTPUTS,
+				SerializationUtils.serialize((Serializable) chainedOutputs));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
+		try {
+			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config,
+					CHAINED_OUTPUTS, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate chained outputs.");
+		}
+	}
+
+	public void setOutEdges(List<StreamEdge> outEdges) {
+		config.setBytes(OUT_STREAM_EDGES, SerializationUtils.serialize((Serializable) outEdges));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<StreamEdge> getOutEdges(ClassLoader cl) {
+		try {
+			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+					this.config, OUT_STREAM_EDGES, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate outputs.");
+		}
+	}
+
+	public void setInPhysicalEdges(List<StreamEdge> inEdges) {
+		config.setBytes(IN_STREAM_EDGES, SerializationUtils.serialize((Serializable) inEdges));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
+		try {
+			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+					this.config, IN_STREAM_EDGES, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate inputs.");
+		}
+	}
+
+	public void setStateMonitoring(boolean stateMonitoring) {
+
+		config.setBoolean(STATE_MONITORING, stateMonitoring);
+
+	}
+
+	public boolean getStateMonitoring()
+	{
+		return config.getBoolean(STATE_MONITORING, false);
+	}
+
+	public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
+		config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
+		try {
+			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+					this.config, EDGES_IN_ORDER, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate outputs.");
+		}
+	}
+
+	public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
+		config.setBytes(CHAINED_TASK_CONFIG,
+				SerializationUtils.serialize((Serializable) chainedTaskConfigs));
+	}
+
+	@SuppressWarnings("unchecked")
+	public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
+		try {
+
+			Map<Integer, StreamConfig> confs = (Map<Integer, StreamConfig>) InstantiationUtil
+					.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
+
+			return confs == null ? new HashMap<Integer, StreamConfig>() : confs;
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate configuration.");
+		}
+	}
+
+	public void setChainStart() {
+		config.setBoolean(IS_CHAINED_VERTEX, true);
+	}
+
+	public boolean isChainStart() {
+		return config.getBoolean(IS_CHAINED_VERTEX, false);
+	}
+
+	@Override
+	public String toString() {
+
+		ClassLoader cl = getClass().getClassLoader();
+
+		StringBuilder builder = new StringBuilder();
+		builder.append("\n=======================");
+		builder.append("Stream Config");
+		builder.append("=======================");
+		builder.append("\nTask name: " + getVertexID());
+		builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
+		builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
+		builder.append("\nOutput names: " + getNonChainedOutputs(cl));
+		builder.append("\nPartitioning:");
+		for (StreamEdge output : getNonChainedOutputs(cl)) {
+			int outputname = output.getTargetID();
+			builder.append("\n\t" + outputname + ": " + output.getPartitioner());
+		}
+
+		builder.append("\nChained subtasks: " + getChainedOutputs(cl));
+
+		try {
+			builder.append("\nOperator: " + getStreamOperator(cl).getClass().getSimpleName());
+		} catch (Exception e) {
+			builder.append("\nOperator: Missing");
+		}
+		builder.append("\nBuffer timeout: " + getBufferTimeout());
+		builder.append("\nState Monitoring: " + getStateMonitoring());
+		if (isChainStart() && getChainedOutputs(cl).size() > 0) {
+			builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
+			builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
+		}
+
+		return builder.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
new file mode 100644
index 0000000..d34b21a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * An edge in the streaming topology. One edge like this does not necessarily
+ * gets converted to a connection between two job vertices (due to
+ * chaining/optimization).
+ */
+public class StreamEdge implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	final private String edgeId;
+
+	final private StreamNode sourceVertex;
+	final private StreamNode targetVertex;
+
+	/**
+	 * The type number of the input for co-tasks.
+	 */
+	final private int typeNumber;
+
+	/**
+	 * A list of output names that the target vertex listens to (if there is
+	 * output selection).
+	 */
+	final private List<String> selectedNames;
+	final private StreamPartitioner<?> outputPartitioner;
+
+	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
+			List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
+		this.sourceVertex = sourceVertex;
+		this.targetVertex = targetVertex;
+		this.typeNumber = typeNumber;
+		this.selectedNames = selectedNames;
+		this.outputPartitioner = outputPartitioner;
+
+		this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
+				+ "_" + outputPartitioner;
+	}
+
+	public StreamNode getSourceVertex() {
+		return sourceVertex;
+	}
+
+	public StreamNode getTargetVertex() {
+		return targetVertex;
+	}
+
+	public int getSourceID() {
+		return sourceVertex.getID();
+	}
+
+	public int getTargetID() {
+		return targetVertex.getID();
+	}
+
+	public int getTypeNumber() {
+		return typeNumber;
+	}
+
+	public List<String> getSelectedNames() {
+		return selectedNames;
+	}
+
+	public StreamPartitioner<?> getPartitioner() {
+		return outputPartitioner;
+	}
+
+	@Override
+	public int hashCode() {
+		return edgeId.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		StreamEdge that = (StreamEdge) o;
+
+		if (!edgeId.equals(that.edgeId)) {
+			return false;
+		}
+
+		return true;
+	}
+
+	@Override
+	public String toString() {
+		return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
+				+ ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
+				+ ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
new file mode 100644
index 0000000..bfeed28
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
+import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoStreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.CoStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.sling.commons.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class representing the streaming topology. It contains all the information
+ * necessary to build the jobgraph for the execution.
+ * 
+ */
+public class StreamGraph extends StreamingPlan {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
+	private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
+	private String jobName = DEAFULT_JOB_NAME;
+
+	private final StreamExecutionEnvironment environemnt;
+	private final ExecutionConfig executionConfig;
+
+	private boolean checkpointingEnabled = false;
+	private long checkpointingInterval = 5000;
+	private boolean chaining = true;
+
+	private final Map<Integer, StreamNode> streamNodes;
+	private final Set<Integer> sources;
+
+	private final Map<Integer, StreamLoop> streamLoops;
+	protected final Map<Integer, StreamLoop> vertexIDtoLoop;
+
+	public StreamGraph(StreamExecutionEnvironment environment) {
+
+		this.environemnt = environment;
+		executionConfig = environment.getConfig();
+
+		streamNodes = new HashMap<Integer, StreamNode>();
+		streamLoops = new HashMap<Integer, StreamLoop>();
+		vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();
+		sources = new HashSet<Integer>();
+	}
+
+	protected ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
+	public void setJobName(String jobName) {
+		this.jobName = jobName;
+	}
+
+	public void setChaining(boolean chaining) {
+		this.chaining = chaining;
+	}
+
+	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
+		this.checkpointingEnabled = checkpointingEnabled;
+	}
+
+	public void setCheckpointingInterval(long checkpointingInterval) {
+		this.checkpointingInterval = checkpointingInterval;
+	}
+
+	public long getCheckpointingInterval() {
+		return checkpointingInterval;
+	}
+
+	public boolean isChainingEnabled() {
+		return chaining;
+	}
+
+	public boolean isCheckpointingEnabled() {
+		return checkpointingEnabled;
+	}
+
+	public boolean isIterative() {
+		return !streamLoops.isEmpty();
+	}
+
+	public <IN, OUT> void addSource(Integer vertexID, StreamOperator<IN, OUT> operatorObject,
+			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+		addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
+		sources.add(vertexID);
+	}
+
+	public <IN, OUT> void addOperator(Integer vertexID, StreamOperator<IN, OUT> operatorObject,
+			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+
+		addNode(vertexID, StreamTask.class, operatorObject, operatorName);
+
+		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
+				inTypeInfo, executionConfig) : null;
+
+		StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
+				&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
+				outTypeInfo, executionConfig) : null;
+
+		setSerializers(vertexID, inSerializer, null, outSerializer);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Vertex: {}", vertexID);
+		}
+	}
+
+	public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
+			CoStreamOperator<IN1, IN2, OUT> taskoperatorObject, TypeInformation<IN1> in1TypeInfo,
+			TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+
+		addNode(vertexID, CoStreamTask.class, taskoperatorObject, operatorName);
+
+		StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
+				&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
+				outTypeInfo, executionConfig) : null;
+
+		setSerializers(vertexID, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
+				new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), outSerializer);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("CO-TASK: {}", vertexID);
+		}
+	}
+
+	public void addIterationHead(Integer vertexID, Integer iterationHead, Integer iterationID,
+			long timeOut) {
+
+		addNode(vertexID, StreamIterationHead.class, null, null);
+
+		chaining = false;
+
+		StreamLoop iteration = new StreamLoop(iterationID, getVertex(iterationHead), timeOut);
+		streamLoops.put(iterationID, iteration);
+		vertexIDtoLoop.put(vertexID, iteration);
+
+		setSerializersFrom(iterationHead, vertexID);
+		getVertex(vertexID).setOperatorName("IterationHead-" + iterationHead);
+
+		int outpartitionerIndex = getVertex(iterationHead).getInEdgeIndices().get(0);
+		StreamPartitioner<?> outputPartitioner = getVertex(outpartitionerIndex).getOutEdges()
+				.get(0).getPartitioner();
+
+		addEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("ITERATION SOURCE: {}", vertexID);
+		}
+
+		sources.add(vertexID);
+	}
+
+	public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID,
+			long waitTime) {
+
+		if (getVertex(iterationTail).getBufferTimeout() == 0) {
+			throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
+		}
+
+		addNode(vertexID, StreamIterationTail.class, null, null).setParallelism(
+				getVertex(iterationTail).getParallelism());
+
+		StreamLoop iteration = streamLoops.get(iterationID);
+		iteration.setTail(getVertex(iterationTail));
+		vertexIDtoLoop.put(vertexID, iteration);
+
+		setSerializersFrom(iterationTail, vertexID);
+		getVertex(vertexID).setOperatorName("IterationTail-" + iterationTail);
+
+		setParallelism(iteration.getHead().getID(), getVertex(iterationTail).getParallelism());
+		setBufferTimeout(iteration.getHead().getID(), getVertex(iterationTail).getBufferTimeout());
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("ITERATION SINK: {}", vertexID);
+		}
+
+	}
+
+	protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
+			StreamOperator<?, ?> operatorObject, String operatorName) {
+
+		StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
+				new ArrayList<OutputSelector<?>>(), vertexClass);
+
+		streamNodes.put(vertexID, vertex);
+
+		return vertex;
+	}
+
+	public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID,
+			StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {
+
+		StreamEdge edge = new StreamEdge(getVertex(upStreamVertexID),
+				getVertex(downStreamVertexID), typeNumber, outputNames, partitionerObject);
+		getVertex(edge.getSourceID()).addOutEdge(edge);
+		getVertex(edge.getTargetID()).addInEdge(edge);
+	}
+
+	public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
+		getVertex(vertexID).addOutputSelector(outputSelector);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Outputselector set for {}", vertexID);
+		}
+
+	}
+
+	public void setParallelism(Integer vertexID, int parallelism) {
+		getVertex(vertexID).setParallelism(parallelism);
+	}
+
+	public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
+		getVertex(vertexID).setBufferTimeout(bufferTimeout);
+	}
+
+	private void setSerializers(Integer vertexID, StreamRecordSerializer<?> in1,
+			StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out) {
+		StreamNode vertex = getVertex(vertexID);
+		vertex.setSerializerIn1(in1);
+		vertex.setSerializerIn2(in2);
+		vertex.setSerializerOut(out);
+	}
+
+	private void setSerializersFrom(Integer from, Integer to) {
+		StreamNode fromVertex = getVertex(from);
+		StreamNode toVertex = getVertex(to);
+
+		toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
+		toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
+	}
+
+	public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
+		StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType,
+				executionConfig);
+		getVertex(vertexID).setSerializerOut(serializer);
+	}
+
+	public <IN, OUT> void setOperator(Integer vertexID, StreamOperator<IN, OUT> operatorObject) {
+		getVertex(vertexID).setOperator(operatorObject);
+	}
+
+	public void setInputFormat(Integer vertexID, InputFormat<String, ?> inputFormat) {
+		getVertex(vertexID).setInputFormat(inputFormat);
+	}
+
+	public StreamNode getVertex(Integer vertexID) {
+		return streamNodes.get(vertexID);
+	}
+
+	protected Collection<? extends Integer> getVertexIDs() {
+		return streamNodes.keySet();
+	}
+
+	protected StreamEdge getEdge(int sourceId, int targetId) {
+		Iterator<StreamEdge> outIterator = getVertex(sourceId).getOutEdges().iterator();
+		while (outIterator.hasNext()) {
+			StreamEdge edge = outIterator.next();
+
+			if (edge.getTargetID() == targetId) {
+				return edge;
+			}
+		}
+
+		throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
+	}
+
+	public Collection<Integer> getSourceIDs() {
+		return sources;
+	}
+
+	public Set<Tuple2<Integer, StreamOperator<?, ?>>> getOperators() {
+		Set<Tuple2<Integer, StreamOperator<?, ?>>> operatorSet = new HashSet<Tuple2<Integer, StreamOperator<?, ?>>>();
+		for (StreamNode vertex : streamNodes.values()) {
+			operatorSet.add(new Tuple2<Integer, StreamOperator<?, ?>>(vertex.getID(), vertex
+					.getOperator()));
+		}
+		return operatorSet;
+	}
+
+	public Collection<StreamLoop> getStreamLoops() {
+		return streamLoops.values();
+	}
+
+	public Integer getLoopID(Integer vertexID) {
+		return vertexIDtoLoop.get(vertexID).getID();
+	}
+
+	public long getLoopTimeout(Integer vertexID) {
+		return vertexIDtoLoop.get(vertexID).getTimeout();
+	}
+
+	protected void removeEdge(StreamEdge edge) {
+
+		edge.getSourceVertex().getOutEdges().remove(edge);
+		edge.getTargetVertex().getInEdges().remove(edge);
+
+	}
+
+	protected void removeVertex(StreamNode toRemove) {
+
+		Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
+
+		edgesToRemove.addAll(toRemove.getInEdges());
+		edgesToRemove.addAll(toRemove.getOutEdges());
+
+		for (StreamEdge edge : edgesToRemove) {
+			removeEdge(edge);
+		}
+		streamNodes.remove(toRemove.getID());
+	}
+
+	/**
+	 * Gets the assembled {@link JobGraph} and adds a default name for it.
+	 */
+	public JobGraph getJobGraph() {
+		return getJobGraph(jobName);
+	}
+
+	/**
+	 * Gets the assembled {@link JobGraph} and adds a user specified name for
+	 * it.
+	 * 
+	 * @param jobGraphName
+	 *            name of the jobGraph
+	 */
+	public JobGraph getJobGraph(String jobGraphName) {
+
+		// temporarily forbid checkpointing for iterative jobs
+		if (isIterative() && isCheckpointingEnabled()) {
+			throw new UnsupportedOperationException(
+					"Checkpointing is currently not supported for iterative jobs!");
+		}
+
+		setJobName(jobGraphName);
+
+		WindowingOptimizer.optimizeGraph(this);
+
+		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
+
+		return jobgraphGenerator.createJobGraph(jobGraphName);
+	}
+
+	@Override
+	public String getStreamingPlanAsJSON() {
+
+		WindowingOptimizer.optimizeGraph(this);
+
+		try {
+			return new JSONGenerator(this).getJSON();
+		} catch (JSONException e) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("JSON plan creation failed: {}", e);
+			}
+			return "";
+		}
+
+	}
+
+	@Override
+	public void dumpStreamingPlanAsJSON(File file) throws IOException {
+		PrintWriter pw = null;
+		try {
+			pw = new PrintWriter(new FileOutputStream(file), false);
+			pw.write(getStreamingPlanAsJSON());
+			pw.flush();
+
+		} finally {
+			if (pw != null) {
+				pw.close();
+			}
+		}
+	}
+
+	/**
+	 * Object for representing loops in streaming programs.
+	 * 
+	 */
+	protected static class StreamLoop {
+
+		private Integer loopID;
+
+		private StreamNode head;
+		private StreamNode tail;
+
+		private Long timeout;
+
+		public StreamLoop(Integer loopID, StreamNode head, Long timeout) {
+			this.loopID = loopID;
+			this.head = head;
+			this.timeout = timeout;
+		}
+
+		public Integer getID() {
+			return loopID;
+		}
+
+		public Long getTimeout() {
+			return timeout;
+		}
+
+		public void setTail(StreamNode tail) {
+			this.tail = tail;
+		}
+
+		public StreamNode getHead() {
+			return head;
+		}
+
+		public StreamNode getTail() {
+			return tail;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
new file mode 100644
index 0000000..cb07f42
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+
+/**
+ * Class representing the operators in the streaming programs, with all their
+ * properties.
+ * 
+ */
+public class StreamNode implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	transient private StreamExecutionEnvironment env;
+
+	private Integer ID;
+	private Integer parallelism = null;
+	private Long bufferTimeout = null;
+	private String operatorName;
+
+	private StreamOperator<?, ?> operator;
+	private List<OutputSelector<?>> outputSelectors;
+	private StreamRecordSerializer<?> typeSerializerIn1;
+	private StreamRecordSerializer<?> typeSerializerIn2;
+	private StreamRecordSerializer<?> typeSerializerOut;
+
+	private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
+	private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
+
+	private Class<? extends AbstractInvokable> jobVertexClass;
+
+	private InputFormat<String, ?> inputFormat;
+
+	public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamOperator<?, ?> operator,
+			String operatorName, List<OutputSelector<?>> outputSelector,
+			Class<? extends AbstractInvokable> jobVertexClass) {
+		this.env = env;
+		this.ID = ID;
+		this.operatorName = operatorName;
+		this.operator = operator;
+		this.outputSelectors = outputSelector;
+		this.jobVertexClass = jobVertexClass;
+	}
+
+	public void addInEdge(StreamEdge inEdge) {
+		if (inEdge.getTargetID() != getID()) {
+			throw new IllegalArgumentException("Destination ID doesn't match the StreamNode ID");
+		} else {
+			inEdges.add(inEdge);
+		}
+	}
+
+	public void addOutEdge(StreamEdge outEdge) {
+		if (outEdge.getSourceID() != getID()) {
+			throw new IllegalArgumentException("Source ID doesn't match the StreamNode ID");
+		} else {
+			outEdges.add(outEdge);
+		}
+	}
+
+	public List<StreamEdge> getOutEdges() {
+		return outEdges;
+	}
+
+	public List<StreamEdge> getInEdges() {
+		return inEdges;
+	}
+
+	public List<Integer> getOutEdgeIndices() {
+		List<Integer> outEdgeIndices = new ArrayList<Integer>();
+
+		for (StreamEdge edge : outEdges) {
+			outEdgeIndices.add(edge.getTargetID());
+		}
+
+		return outEdgeIndices;
+	}
+
+	public List<Integer> getInEdgeIndices() {
+		List<Integer> inEdgeIndices = new ArrayList<Integer>();
+
+		for (StreamEdge edge : inEdges) {
+			inEdgeIndices.add(edge.getSourceID());
+		}
+
+		return inEdgeIndices;
+	}
+
+	public Integer getID() {
+		return ID;
+	}
+
+	public Integer getParallelism() {
+		return parallelism != null ? parallelism : env.getParallelism();
+	}
+
+	public void setParallelism(Integer parallelism) {
+		this.parallelism = parallelism;
+	}
+
+	public Long getBufferTimeout() {
+		return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
+	}
+
+	public void setBufferTimeout(Long bufferTimeout) {
+		this.bufferTimeout = bufferTimeout;
+	}
+
+	public StreamOperator<?, ?> getOperator() {
+		return operator;
+	}
+
+	public void setOperator(StreamOperator<?, ?> operator) {
+		this.operator = operator;
+	}
+
+	public String getOperatorName() {
+		return operatorName;
+	}
+
+	public void setOperatorName(String operatorName) {
+		this.operatorName = operatorName;
+	}
+
+	public List<OutputSelector<?>> getOutputSelectors() {
+		return outputSelectors;
+	}
+
+	public OutputSelectorWrapper<?> getOutputSelectorWrapper() {
+		return OutputSelectorWrapperFactory.create(getOutputSelectors());
+	}
+
+	public void addOutputSelector(OutputSelector<?> outputSelector) {
+		this.outputSelectors.add(outputSelector);
+	}
+
+	public StreamRecordSerializer<?> getTypeSerializerIn1() {
+		return typeSerializerIn1;
+	}
+
+	public void setSerializerIn1(StreamRecordSerializer<?> typeSerializerIn1) {
+		this.typeSerializerIn1 = typeSerializerIn1;
+	}
+
+	public StreamRecordSerializer<?> getTypeSerializerIn2() {
+		return typeSerializerIn2;
+	}
+
+	public void setSerializerIn2(StreamRecordSerializer<?> typeSerializerIn2) {
+		this.typeSerializerIn2 = typeSerializerIn2;
+	}
+
+	public StreamRecordSerializer<?> getTypeSerializerOut() {
+		return typeSerializerOut;
+	}
+
+	public void setSerializerOut(StreamRecordSerializer<?> typeSerializerOut) {
+		this.typeSerializerOut = typeSerializerOut;
+	}
+
+	public Class<? extends AbstractInvokable> getJobVertexClass() {
+		return jobVertexClass;
+	}
+
+	public InputFormat<String, ?> getInputFormat() {
+		return inputFormat;
+	}
+
+	public void setInputFormat(InputFormat<String, ?> inputFormat) {
+		this.inputFormat = inputFormat;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
new file mode 100644
index 0000000..7b856a1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.PartitioningStrategy;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingJobGraphGenerator {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
+
+	private StreamGraph streamGraph;
+
+	private Map<Integer, AbstractJobVertex> jobVertices;
+	private JobGraph jobGraph;
+	private Collection<Integer> builtVertices;
+
+	private List<StreamEdge> physicalEdgesInOrder;
+
+	private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
+
+	private Map<Integer, StreamConfig> vertexConfigs;
+	private Map<Integer, String> chainedNames;
+
+	public StreamingJobGraphGenerator(StreamGraph streamGraph) {
+		this.streamGraph = streamGraph;
+	}
+
+	private void init() {
+		this.jobVertices = new HashMap<Integer, AbstractJobVertex>();
+		this.builtVertices = new HashSet<Integer>();
+		this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
+		this.vertexConfigs = new HashMap<Integer, StreamConfig>();
+		this.chainedNames = new HashMap<Integer, String>();
+		this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
+	}
+
+	public JobGraph createJobGraph(String jobName) {
+		jobGraph = new JobGraph(jobName);
+
+		// Turn lazy scheduling off
+		jobGraph.setScheduleMode(ScheduleMode.ALL);
+		jobGraph.setJobType(JobGraph.JobType.STREAMING);
+		jobGraph.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
+		jobGraph.setCheckpointingInterval(streamGraph.getCheckpointingInterval());
+
+		if (jobGraph.isCheckpointingEnabled()) {
+			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
+			if (executionRetries != -1) {
+				jobGraph.setNumberOfExecutionRetries(executionRetries);
+			} else {
+				jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+			}
+		}
+		init();
+
+		setChaining();
+
+		setPhysicalEdges();
+
+		setSlotSharing();
+
+		return jobGraph;
+	}
+
+	private void setPhysicalEdges() {
+		Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
+
+		for (StreamEdge edge : physicalEdgesInOrder) {
+			int target = edge.getTargetID();
+
+			List<StreamEdge> inEdges = physicalInEdgesInOrder.get(target);
+
+			// create if not set
+			if (inEdges == null) {
+				inEdges = new ArrayList<StreamEdge>();
+				physicalInEdgesInOrder.put(target, inEdges);
+			}
+
+			inEdges.add(edge);
+		}
+
+		for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
+			int vertex = inEdges.getKey();
+			List<StreamEdge> edgeList = inEdges.getValue();
+
+			vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
+		}
+	}
+
+	private void setChaining() {
+		for (Integer sourceName : streamGraph.getSourceIDs()) {
+			createChain(sourceName, sourceName);
+		}
+	}
+
+	private List<StreamEdge> createChain(Integer startNode, Integer current) {
+
+		if (!builtVertices.contains(startNode)) {
+
+			List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
+
+			List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
+			List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
+
+			for (StreamEdge outEdge : streamGraph.getVertex(current).getOutEdges()) {
+				if (isChainable(outEdge)) {
+					chainableOutputs.add(outEdge);
+				} else {
+					nonChainableOutputs.add(outEdge);
+				}
+			}
+
+			for (StreamEdge chainable : chainableOutputs) {
+				transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetID()));
+			}
+
+			for (StreamEdge nonChainable : nonChainableOutputs) {
+				transitiveOutEdges.add(nonChainable);
+				createChain(nonChainable.getTargetID(), nonChainable.getTargetID());
+			}
+
+			chainedNames.put(current, createChainedName(current, chainableOutputs));
+
+			StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode)
+					: new StreamConfig(new Configuration());
+
+			setVertexConfig(current, config, chainableOutputs, nonChainableOutputs);
+
+			if (current.equals(startNode)) {
+
+				config.setChainStart();
+				config.setOutEdgesInOrder(transitiveOutEdges);
+				config.setOutEdges(streamGraph.getVertex(current).getOutEdges());
+
+				for (StreamEdge edge : transitiveOutEdges) {
+					connect(startNode, edge);
+				}
+
+				config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNode));
+
+			} else {
+
+				Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNode);
+
+				if (chainedConfs == null) {
+					chainedConfigs.put(startNode, new HashMap<Integer, StreamConfig>());
+				}
+				chainedConfigs.get(startNode).put(current, config);
+			}
+
+			return transitiveOutEdges;
+
+		} else {
+			return new ArrayList<StreamEdge>();
+		}
+	}
+
+	private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
+		String operatorName = streamGraph.getVertex(vertexID).getOperatorName();
+		if (chainedOutputs.size() > 1) {
+			List<String> outputChainedNames = new ArrayList<String>();
+			for (StreamEdge chainable : chainedOutputs) {
+				outputChainedNames.add(chainedNames.get(chainable.getTargetID()));
+			}
+			String returnOperatorName = operatorName + " -> ("
+					+ StringUtils.join(outputChainedNames, ", ") + ")";
+			return returnOperatorName;
+		} else if (chainedOutputs.size() == 1) {
+			String returnOperatorName = operatorName + " -> "
+					+ chainedNames.get(chainedOutputs.get(0).getTargetID());
+			return returnOperatorName;
+		} else {
+			return operatorName;
+		}
+
+	}
+
+	private StreamConfig createProcessingVertex(Integer vertexID) {
+
+		AbstractJobVertex jobVertex = new AbstractJobVertex(chainedNames.get(vertexID));
+		StreamNode vertex = streamGraph.getVertex(vertexID);
+
+		jobVertex.setInvokableClass(vertex.getJobVertexClass());
+
+		int parallelism = vertex.getParallelism();
+
+		if (parallelism > 0) {
+			jobVertex.setParallelism(parallelism);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Parallelism set: {} for {}", parallelism, vertexID);
+		}
+
+		if (vertex.getInputFormat() != null) {
+			jobVertex.setInputSplitSource(vertex.getInputFormat());
+		}
+
+		jobVertices.put(vertexID, jobVertex);
+		builtVertices.add(vertexID);
+		jobGraph.addVertex(jobVertex);
+
+		StreamConfig retConfig = new StreamConfig(jobVertex.getConfiguration());
+		retConfig.setOperatorName(chainedNames.get(vertexID));
+		return retConfig;
+	}
+
+	private void setVertexConfig(Integer vertexID, StreamConfig config,
+			List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
+
+		StreamNode vertex = streamGraph.getVertex(vertexID);
+
+		config.setVertexID(vertexID);
+		config.setBufferTimeout(vertex.getBufferTimeout());
+
+		config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
+		config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
+		config.setTypeSerializerOut1(vertex.getTypeSerializerOut());
+
+		config.setStreamOperator(vertex.getOperator());
+		config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
+
+		config.setNumberOfOutputs(nonChainableOutputs.size());
+		config.setNonChainedOutputs(nonChainableOutputs);
+		config.setChainedOutputs(chainableOutputs);
+		config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
+
+		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
+
+		if (vertexClass.equals(StreamIterationHead.class)
+				|| vertexClass.equals(StreamIterationTail.class)) {
+			config.setIterationId(streamGraph.getLoopID(vertexID));
+			config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID));
+		}
+
+		List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
+		allOutputs.addAll(nonChainableOutputs);
+
+		for (StreamEdge output : allOutputs) {
+			config.setSelectedNames(output.getTargetID(),
+					streamGraph.getEdge(vertexID, output.getTargetID()).getSelectedNames());
+		}
+
+		vertexConfigs.put(vertexID, config);
+	}
+
+	private void connect(Integer headOfChain, StreamEdge edge) {
+
+		physicalEdgesInOrder.add(edge);
+
+		Integer downStreamvertexID = edge.getTargetID();
+
+		AbstractJobVertex headVertex = jobVertices.get(headOfChain);
+		AbstractJobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
+
+		StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
+
+		downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
+
+		StreamPartitioner<?> partitioner = edge.getPartitioner();
+		if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) {
+			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
+		} else {
+			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
+					headOfChain, downStreamvertexID);
+		}
+	}
+
+	private boolean isChainable(StreamEdge edge) {
+		StreamNode upStreamVertex = edge.getSourceVertex();
+		StreamNode downStreamVertex = edge.getTargetVertex();
+
+		StreamOperator<?, ?> headOperator = upStreamVertex.getOperator();
+		StreamOperator<?, ?> outOperator = downStreamVertex.getOperator();
+
+		return downStreamVertex.getInEdges().size() == 1
+				&& outOperator != null
+				&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
+				&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator
+						.getChainingStrategy() == ChainingStrategy.ALWAYS)
+				&& (edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD || downStreamVertex
+						.getParallelism() == 1)
+				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
+				&& streamGraph.isChainingEnabled();
+	}
+
+	private void setSlotSharing() {
+		SlotSharingGroup shareGroup = new SlotSharingGroup();
+
+		for (AbstractJobVertex vertex : jobVertices.values()) {
+			vertex.setSlotSharingGroup(shareGroup);
+		}
+
+		for (StreamLoop loop : streamGraph.getStreamLoops()) {
+			CoLocationGroup ccg = new CoLocationGroup();
+			AbstractJobVertex tail = jobVertices.get(loop.getTail().getID());
+			AbstractJobVertex head = jobVertices.get(loop.getHead().getID());
+
+			ccg.addVertex(head);
+			ccg.addVertex(tail);
+		}
+	}
+}