You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:40 UTC

[16/63] [abbrv] Refactor job graph construction to incremental attachment based

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java
deleted file mode 100644
index 2f89a80..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FormatUtil.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.io;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.util.ReflectionUtil;
-
-/**
- * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}.
- */
-public class FormatUtil {
-
-
-	/**
-	 * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration}
-	 * initializes the format.
-	 * 
-	 * @param <T>
-	 *        the class of the InputFormat
-	 * @param inputFormatClass
-	 *        the class of the InputFormat
-	 * @param path
-	 *        the path of the file
-	 * @param configuration
-	 *        optional configuration of the InputFormat
-	 * @return the created {@link InputFormat}
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the file or initializing the InputFormat.
-	 */
-	public static <T, F extends FileInputFormat<T>> F openInput(
-			Class<F> inputFormatClass, String path, Configuration configuration)
-		throws IOException
-	{
-		configuration = configuration == null ? new Configuration() : configuration;
-
-		Path normalizedPath = normalizePath(new Path(path));
-		final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
-
-		inputFormat.setFilePath(normalizedPath);
-		inputFormat.setOpenTimeout(0);
-		inputFormat.configure(configuration);
-
-		final FileSystem fs = FileSystem.get(normalizedPath.toUri());
-		FileStatus fileStatus = fs.getFileStatus(normalizedPath);
-
-		BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-		inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts()));
-		return inputFormat;
-	}
-
-	/**
-	 * Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration}
-	 * initializes the formats.
-	 * 
-	 * @param <T>
-	 *        the class of the InputFormat
-	 * @param inputFormatClass
-	 *        the class of the InputFormat
-	 * @param path
-	 *        the path of the file or to the directory containing the splits
-	 * @param configuration
-	 *        optional configuration of the InputFormat
-	 * @return the created {@link InputFormat}s for each file in the specified path
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the files or initializing the InputFormat.
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T, F extends FileInputFormat<T>> List<F> openAllInputs(
-			Class<F> inputFormatClass, String path, Configuration configuration) throws IOException {
-		Path nephelePath = new Path(path);
-		FileSystem fs = nephelePath.getFileSystem();
-		FileStatus fileStatus = fs.getFileStatus(nephelePath);
-		if (!fileStatus.isDir()) {
-			return Arrays.asList(openInput(inputFormatClass, path, configuration));
-		}
-		FileStatus[] list = fs.listStatus(nephelePath);
-		List<F> formats = new ArrayList<F>();
-		for (int index = 0; index < list.length; index++) {
-			formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration));
-		}
-		return formats;
-	}
-
-	/**
-	 * Creates an {@link InputFormat} from a given class. The optional {@link Configuration}
-	 * initializes the format.
-	 * 
-	 * @param <T>
-	 *        the class of the InputFormat
-	 * @param inputFormatClass
-	 *        the class of the InputFormat
-	 * @param configuration
-	 *        optional configuration of the InputFormat
-	 * @return the created {@link InputFormat}
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the file or initializing the InputFormat.
-	 */
-	public static <T, IS extends InputSplit, F extends InputFormat<T, IS>> F openInput(
-			Class<F> inputFormatClass, Configuration configuration) throws IOException {
-		configuration = configuration == null ? new Configuration() : configuration;
-
-		final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
-		inputFormat.configure(configuration);
-		final IS[] splits = inputFormat.createInputSplits(1);
-		inputFormat.open(splits[0]);
-		return inputFormat;
-	}
-	
-	/**
-	 * Creates an {@link OutputFormat} from a given class for the specified file. The optional {@link Configuration}
-	 * initializes the format.
-	 * 
-	 * @param <T>
-	 *        the class of the OutputFormat
-	 * @param outputFormatClass
-	 *        the class of the OutputFormat
-	 * @param path
-	 *        the path of the file or to the directory containing the splits
-	 * @param configuration
-	 *        optional configuration of the OutputFormat
-	 * @return the created {@link OutputFormat}
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the file or initializing the OutputFormat.
-	 */
-	public static <T, F extends FileOutputFormat<? extends T>> F openOutput(
-			Class<F> outputFormatClass, String path, Configuration configuration) 
-		throws IOException
-	{
-		final F outputFormat = ReflectionUtil.newInstance(outputFormatClass);
-		outputFormat.setOutputFilePath(new Path(path));
-		outputFormat.setWriteMode(WriteMode.OVERWRITE);
-	
-		configuration = configuration == null ? new Configuration() : configuration;
-		
-		outputFormat.configure(configuration);
-		outputFormat.open(0, 1);
-		return outputFormat;
-	}
-
-	/**
-	 * Fixes the path if it denotes a local (relative) file without the proper protocol prefix.
-	 */
-	private static Path normalizePath(Path path) {
-		URI uri = path.toUri();
-		if (uri.getScheme() == null) {
-			try {
-				uri = new URI("file", uri.getHost(), uri.getPath(), uri.getFragment());
-				path = new Path(uri.toString());
-			} catch (URISyntaxException e) {
-				throw new IllegalArgumentException("path is invalid", e);
-			}
-		}
-		return path;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java
index 0ddeb64..c108471 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericInputFormat.java
@@ -50,7 +50,6 @@ public abstract class GenericInputFormat<OT> implements InputFormat<OT, GenericI
 		return cachedStatistics;
 	}
 
-
 	@Override
 	public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
 		if (numSplits < 1) {
@@ -66,8 +65,8 @@ public abstract class GenericInputFormat<OT> implements InputFormat<OT, GenericI
 	}
 	
 	@Override
-	public Class<? extends GenericInputSplit> getInputSplitType() {
-		return GenericInputSplit.class;
+	public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
+		return new DefaultInputSplitAssigner(splits);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java
index 1bb7815..5eaa657 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InitializeOnMaster.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
index cb4019c..6845237 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import java.io.IOException;
@@ -25,6 +24,8 @@ import java.io.Serializable;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
 
 /**
  * The base interface for data sources that produces records.
@@ -59,7 +60,7 @@ import org.apache.flink.core.io.InputSplit;
  * @param <OT> The type of the produced records.
  * @param <T> The type of input split.
  */
-public interface InputFormat<OT, T extends InputSplit> extends Serializable {
+public interface InputFormat<OT, T extends InputSplit> extends InputSplitSource<T>, Serializable {
 	
 	/**
 	 * Configures this input format. Since input formats are instantiated generically and hence parameterless, 
@@ -95,6 +96,7 @@ public interface InputFormat<OT, T extends InputSplit> extends Serializable {
 	 * 
 	 * @throws IOException Thrown, when the creation of the splits was erroneous.
 	 */
+	@Override
 	T[] createInputSplits(int minNumSplits) throws IOException;
 	
 	/**
@@ -102,7 +104,8 @@ public interface InputFormat<OT, T extends InputSplit> extends Serializable {
 	 * 
 	 * @return The type of the input splits.
 	 */
-	Class<? extends T> getInputSplitType();
+	@Override
+	InputSplitAssigner getInputSplitAssigner(T[] inputSplits);
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
new file mode 100644
index 0000000..6243681
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.LocatableInputSplit;
+
+/**
+ * The locatable input split assigner assigns to each host splits that are local, before assigning
+ * splits that are not local. 
+ */
+public final class LocatableInputSplitAssigner implements InputSplitAssigner {
+
+	private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
+
+
+	private final Set<LocatableInputSplit> unassigned = new HashSet<LocatableInputSplit>();
+	
+	private final ConcurrentHashMap<String, List<LocatableInputSplit>> localPerHost = new ConcurrentHashMap<String, List<LocatableInputSplit>>();
+	
+	private int localAssignments;		// lock protected by the unassigned set lock
+	
+	private int remoteAssignments;		// lock protected by the unassigned set lock
+
+	// --------------------------------------------------------------------------------------------
+	
+	public LocatableInputSplitAssigner(Collection<LocatableInputSplit> splits) {
+		this.unassigned.addAll(splits);
+	}
+	
+	public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
+		Collections.addAll(this.unassigned, splits);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public LocatableInputSplit getNextInputSplit(String host) {
+		// for a null host, we return an arbitrary split
+		if (host == null) {
+			
+			synchronized (this.unassigned) {
+				Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
+				if (iter.hasNext()) {
+					LocatableInputSplit next = iter.next();
+					iter.remove();
+					
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Assigning arbitrary split to null host.");
+					}
+					
+					remoteAssignments++;
+					return next;
+				} else {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("No more input splits remaining.");
+					}
+					return null;
+				}
+			}
+		}
+		
+		host = host.toLowerCase(Locale.US);
+		
+		// for any non-null host, we take the list of non-null splits
+		List<LocatableInputSplit> localSplits = this.localPerHost.get(host);
+		
+		// if we have no list for this host yet, create one
+		if (localSplits == null) {
+			localSplits = new ArrayList<LocatableInputSplit>(16);
+			
+			// lock the list, to be sure that others have to wait for that host's local list
+			synchronized (localSplits) {
+				List<LocatableInputSplit> prior = this.localPerHost.putIfAbsent(host, localSplits);
+				
+				// if someone else beat us in the case to create this list, then we do not populate this one, but
+				// simply work with that other list
+				if (prior == null) {
+					// we are the first, we populate
+					
+					// first, copy the remaining splits to release the lock on the set early
+					// because that is shared among threads
+					LocatableInputSplit[] remaining;
+					synchronized (this.unassigned) {
+						remaining = (LocatableInputSplit[]) this.unassigned.toArray(new LocatableInputSplit[this.unassigned.size()]);
+					}
+					
+					for (LocatableInputSplit is : remaining) {
+						if (isLocal(host, is.getHostnames())) {
+							localSplits.add(is);
+						}
+					}
+				}
+				else {
+					// someone else was faster
+					localSplits = prior;
+				}
+			}
+		}
+		
+		// at this point, we have a list of local splits (possibly empty)
+		// we need to make sure no one else operates in the current list (that protects against
+		// list creation races) and that the unassigned set is consistent
+		// NOTE: we need to obtain the locks in this order, strictly!!!
+		synchronized (localSplits) {
+			int size = localSplits.size();
+			if (size > 0) {
+				synchronized (this.unassigned) {
+					do {
+						--size;
+						LocatableInputSplit split = localSplits.remove(size);
+						if (this.unassigned.remove(split)) {
+							
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Assigning local split to host " + host);
+							}
+							
+							localAssignments++;
+							return split;
+						}
+					} while (size > 0);
+				}
+			}
+		}
+		
+		// we did not find a local split, return any
+		synchronized (this.unassigned) {
+			Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
+			if (iter.hasNext()) {
+				LocatableInputSplit next = iter.next();
+				iter.remove();
+				
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Assigning remote split to host " + host);
+				}
+				
+				remoteAssignments++;
+				return next;
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("No more input splits remaining.");
+				}
+				return null;
+			}
+		}
+	}
+	
+	private static final boolean isLocal(String host, String[] hosts) {
+		if (host == null || hosts == null) {
+			return false;
+		}
+		
+		for (String h : hosts) {
+			if (h != null && host.equals(h.toLowerCase())) {
+				return true;
+			}
+		}
+		
+		return false;
+	}
+	
+	public int getNumberOfLocalAssignments() {
+		return localAssignments;
+	}
+	
+	public int getNumberOfRemoteAssignments() {
+		return remoteAssignments;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java b/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java
index 157df71..d4fce5b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/NonParallelInput.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.api.common.io;
 
-
 /**
  * This interface acts as a marker for input formats for inputs which cannot be split.
- * Data sources with a Sequential input formats are always executed with a degree-of-parallelism
+ * Data sources with a non-parallel input formats are always executed with a degree-of-parallelism
  * of one.
  * 
  * @see InputFormat

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java
index cd02ac9..ddf9cbc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import java.io.IOException;
@@ -24,7 +23,6 @@ import java.io.Serializable;
 
 import org.apache.flink.configuration.Configuration;
 
-
 /**
  * The base interface for outputs that consumes records. The output format
  * describes how to store the final records, for example in a file.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
index c4a69ba..ebee5d0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
@@ -151,19 +151,12 @@ public class FileInputSplit extends LocatableInputSplit {
 		if (obj == this) {
 			return true;
 		}
-		else if (obj != null && super.equals(obj) && obj instanceof FileInputSplit) {
+		else if (obj != null && obj instanceof FileInputSplit && super.equals(obj)) {
 			FileInputSplit other = (FileInputSplit) obj;
 			
-			if (this.file != null) {
-				if (!this.file.equals(other.file)) {
-					return false;
-				}
-			}
-			else if (other.file != null) {
-				return false;
-			}
-			
-			return this.start == other.start && this.length == other.length;
+			return this.start == other.start &&
+					this.length == other.length &&
+					(this.file == null ? other.file == null : (other.file != null && this.file.equals(other.file)));
 		}
 		else {
 			return false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
index 850ba1c..52018a1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
@@ -78,7 +78,7 @@ public class GenericInputSplit implements InputSplit, java.io.Serializable {
 	}
 
 	@Override
-	public void read(final DataInputView in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 		this.partitionNumber = in.readInt();
 		this.totalNumberOfPartitions = in.readInt();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
index c054fb8..1a51207 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.io;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
new file mode 100644
index 0000000..256b9c7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitSource.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+/**
+ * InputSplitSources create {@link InputSplit}s that define portions of data to be produced
+ * by {@link org.apache.flink.api.common.io.InputFormat}s.
+ *
+ * @param <T> The type of the input splits created by the source.
+ */
+public interface InputSplitSource<T extends InputSplit> extends java.io.Serializable {
+
+	/**
+	 * Computes the input splits. The given minimum number of splits is a hint as to how
+	 * many splits are desired.
+	 *
+	 * @param minNumSplits Number of minimal input splits, as a hint.
+	 * @return An array of input splits.
+	 * 
+	 * @throws Exception Exceptions when creating the input splits may be forwarded and will cause the
+	 *                   execution to permanently fail.
+	 */
+	T[] createInputSplits(int minNumSplits) throws Exception;
+	
+	/**
+	 * Returns the assigner for the input splits. Assigner determines which parallel instance of the
+	 * input format gets which input split.
+	 *
+	 * @return The input split assigner.
+	 */
+	InputSplitAssigner getInputSplitAssigner(T[] inputSplits);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
new file mode 100644
index 0000000..25835f5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public final class DataInputViewStream extends InputStream {
+
+	private final DataInputView inputView;
+	
+	
+	public DataInputViewStream(DataInputView inputView) {
+		this.inputView = inputView;
+	}
+
+	
+	@Override
+	public int read() throws IOException {
+		return inputView.readByte();
+	}
+	
+	public int read(byte b[], int off, int len) throws IOException {
+		inputView.readFully(b, off, len);
+		return len;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java
new file mode 100644
index 0000000..f3188aa
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStream.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public final class DataOutputViewStream extends OutputStream {
+
+	private final DataOutputView outputView;
+	
+	
+	public DataOutputViewStream(DataOutputView outputView) {
+		this.outputView = outputView;
+	}
+
+
+	@Override
+	public void write(int b) throws IOException {
+		outputView.write(b);
+	}
+	
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		outputView.write(b, off, len);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
index 00ad8e5..7e891bb 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.util;
 
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
new file mode 100644
index 0000000..8f9822e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class ExceptionUtils {
+
+	/**
+	 * Makes a string representation of the exception's stack trace.
+	 * 
+	 * @param e The exception to stringify.
+	 * @return A string with exception name and call stack.
+	 */
+	public static String stringifyException(final Throwable e) {
+		final StringWriter stm = new StringWriter();
+		final PrintWriter wrt = new PrintWriter(stm);
+		e.printStackTrace(wrt);
+		wrt.close();
+		return stm.toString();
+	}
+	
+	/**
+	 * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
+	 * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown directly, other exceptions
+	 * are packed into runtime exceptions
+	 * 
+	 * @param t The throwable to be thrown.
+	 */
+	public static void rethrow(Throwable t) {
+		if (t instanceof Error) {
+			throw (Error) t;
+		}
+		else if (t instanceof RuntimeException) {
+			throw (RuntimeException) t;
+		}
+		else {
+			throw new RuntimeException(t);
+		}
+	}
+	
+	/**
+	 * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
+	 * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown directly, other exceptions
+	 * are packed into a parent RuntimeEception.
+	 * 
+	 * @param t The throwable to be thrown.
+	 * @param parentMessage The message for the parent RuntimeException, if one is needed.
+	 */
+	public static void rethrow(Throwable t, String parentMessage) {
+		if (t instanceof Error) {
+			throw (Error) t;
+		}
+		else if (t instanceof RuntimeException) {
+			throw (RuntimeException) t;
+		}
+		else {
+			throw new RuntimeException(parentMessage, t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java b/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java
index 2bdbc60..755e188 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NumberSequenceIterator.java
@@ -21,13 +21,10 @@ package org.apache.flink.util;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-
-
-/**
- *
- */
 /**
- *
+ * The {@code NumberSequenceIterator} is an iterator that returns a sequence of numbers (as {@code Long})s.
+ * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple
+ * iterators that each return a subsequence of the number sequence.
  */
 public class NumberSequenceIterator implements SplittableIterator<Long> {
 	
@@ -38,7 +35,12 @@ public class NumberSequenceIterator implements SplittableIterator<Long> {
 	private long current;
 	
 	
-	
+	/**
+	 * Internal constructor to allow for empty iterators.
+	 * 
+	 * @param from The first number returned by the iterator.
+	 * @param to The last number returned by the iterator.
+	 */
 	public NumberSequenceIterator(long from, long to) {
 		if (from > to) {
 			throw new IllegalArgumentException("The 'to' value must not be smaller than the 'from' value.");
@@ -52,11 +54,11 @@ public class NumberSequenceIterator implements SplittableIterator<Long> {
 	/**
 	 * Internal constructor to allow for empty iterators.
 	 * 
-	 * @param from
-	 * @param to
-	 * @param mark
+	 * @param from The first number returned by the iterator.
+	 * @param to The last number returned by the iterator.
+	 * @param unused A dummy parameter to disambiguate the constructor.
 	 */
-	private NumberSequenceIterator(long from, long to, boolean mark) {
+	private NumberSequenceIterator(long from, long to, boolean unused) {
 		this.current = from;
 		this.to = to;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java b/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java
index c3e6362..cfbd14e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SimpleStringUtils.java
@@ -16,14 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.util;
 
 import java.io.Serializable;
 
 import org.apache.flink.types.StringValue;
 
-
 /**
  * Utility class for efficient string operations on strings. All methods in this class are
  * written to be optimized for efficiency and work only on strings whose characters are

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
index a15c31c..7d50d12 100644
--- a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java
@@ -18,10 +18,17 @@
 
 package org.apache.flink.util;
 
+/**
+ * An exception, indicating that an {@link java.lang.Iterable} can only be traversed once, but has been attempted
+ * to traverse an additional time.
+ */
 public class TraversableOnceException extends RuntimeException {
 
 	private static final long serialVersionUID = 7636881584773577290L;
 
+	/**
+	 * Creates a new exception with a default message.
+	 */
 	public TraversableOnceException() {
 		super("The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed.");
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
new file mode 100644
index 0000000..2f89a80
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.io;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.util.ReflectionUtil;
+
+/**
+ * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}.
+ */
+public class FormatUtil {
+
+
+	/**
+	 * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration}
+	 * initializes the format.
+	 * 
+	 * @param <T>
+	 *        the class of the InputFormat
+	 * @param inputFormatClass
+	 *        the class of the InputFormat
+	 * @param path
+	 *        the path of the file
+	 * @param configuration
+	 *        optional configuration of the InputFormat
+	 * @return the created {@link InputFormat}
+	 * @throws IOException
+	 *         if an I/O error occurred while accessing the file or initializing the InputFormat.
+	 */
+	public static <T, F extends FileInputFormat<T>> F openInput(
+			Class<F> inputFormatClass, String path, Configuration configuration)
+		throws IOException
+	{
+		configuration = configuration == null ? new Configuration() : configuration;
+
+		Path normalizedPath = normalizePath(new Path(path));
+		final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
+
+		inputFormat.setFilePath(normalizedPath);
+		inputFormat.setOpenTimeout(0);
+		inputFormat.configure(configuration);
+
+		final FileSystem fs = FileSystem.get(normalizedPath.toUri());
+		FileStatus fileStatus = fs.getFileStatus(normalizedPath);
+
+		BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+		inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts()));
+		return inputFormat;
+	}
+
+	/**
+	 * Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration}
+	 * initializes the formats.
+	 * 
+	 * @param <T>
+	 *        the class of the InputFormat
+	 * @param inputFormatClass
+	 *        the class of the InputFormat
+	 * @param path
+	 *        the path of the file or to the directory containing the splits
+	 * @param configuration
+	 *        optional configuration of the InputFormat
+	 * @return the created {@link InputFormat}s for each file in the specified path
+	 * @throws IOException
+	 *         if an I/O error occurred while accessing the files or initializing the InputFormat.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T, F extends FileInputFormat<T>> List<F> openAllInputs(
+			Class<F> inputFormatClass, String path, Configuration configuration) throws IOException {
+		Path nephelePath = new Path(path);
+		FileSystem fs = nephelePath.getFileSystem();
+		FileStatus fileStatus = fs.getFileStatus(nephelePath);
+		if (!fileStatus.isDir()) {
+			return Arrays.asList(openInput(inputFormatClass, path, configuration));
+		}
+		FileStatus[] list = fs.listStatus(nephelePath);
+		List<F> formats = new ArrayList<F>();
+		for (int index = 0; index < list.length; index++) {
+			formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration));
+		}
+		return formats;
+	}
+
+	/**
+	 * Creates an {@link InputFormat} from a given class. The optional {@link Configuration}
+	 * initializes the format.
+	 * 
+	 * @param <T>
+	 *        the class of the InputFormat
+	 * @param inputFormatClass
+	 *        the class of the InputFormat
+	 * @param configuration
+	 *        optional configuration of the InputFormat
+	 * @return the created {@link InputFormat}
+	 * @throws IOException
+	 *         if an I/O error occurred while accessing the file or initializing the InputFormat.
+	 */
+	public static <T, IS extends InputSplit, F extends InputFormat<T, IS>> F openInput(
+			Class<F> inputFormatClass, Configuration configuration) throws IOException {
+		configuration = configuration == null ? new Configuration() : configuration;
+
+		final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
+		inputFormat.configure(configuration);
+		final IS[] splits = inputFormat.createInputSplits(1);
+		inputFormat.open(splits[0]);
+		return inputFormat;
+	}
+	
+	/**
+	 * Creates an {@link OutputFormat} from a given class for the specified file. The optional {@link Configuration}
+	 * initializes the format.
+	 * 
+	 * @param <T>
+	 *        the class of the OutputFormat
+	 * @param outputFormatClass
+	 *        the class of the OutputFormat
+	 * @param path
+	 *        the path of the file or to the directory containing the splits
+	 * @param configuration
+	 *        optional configuration of the OutputFormat
+	 * @return the created {@link OutputFormat}
+	 * @throws IOException
+	 *         if an I/O error occurred while accessing the file or initializing the OutputFormat.
+	 */
+	public static <T, F extends FileOutputFormat<? extends T>> F openOutput(
+			Class<F> outputFormatClass, String path, Configuration configuration) 
+		throws IOException
+	{
+		final F outputFormat = ReflectionUtil.newInstance(outputFormatClass);
+		outputFormat.setOutputFilePath(new Path(path));
+		outputFormat.setWriteMode(WriteMode.OVERWRITE);
+	
+		configuration = configuration == null ? new Configuration() : configuration;
+		
+		outputFormat.configure(configuration);
+		outputFormat.open(0, 1);
+		return outputFormat;
+	}
+
+	/**
+	 * Fixes the path if it denotes a local (relative) file without the proper protocol prefix.
+	 */
+	private static Path normalizePath(Path path) {
+		URI uri = path.toUri();
+		if (uri.getScheme() == null) {
+			try {
+				uri = new URI("file", uri.getHost(), uri.getPath(), uri.getFragment());
+				path = new Path(uri.toString());
+			} catch (URISyntaxException e) {
+				throw new IllegalArgumentException("path is invalid", e);
+			}
+		}
+		return path;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
new file mode 100644
index 0000000..b88fcc5
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import static org.junit.Assert.*;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.junit.Test;
+
+
+public class DefaultSplitAssignerTest {
+
+	@Test
+	public void testSerialSplitAssignment() {
+		try {
+			final int NUM_SPLITS = 50;
+			
+			Set<InputSplit> splits = new HashSet<InputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new GenericInputSplit(i, NUM_SPLITS));
+			}
+			
+			DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
+			InputSplit is = null;
+			while ((is = ia.getNextInputSplit("")) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit(""));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConcurrentSplitAssignment() {
+		try {
+			final int NUM_THREADS = 10;
+			final int NUM_SPLITS = 500;
+			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+			
+			Set<InputSplit> splits = new HashSet<InputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new GenericInputSplit(i, NUM_SPLITS));
+			}
+			
+			final DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
+			
+			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+			final AtomicInteger sumOfIds = new AtomicInteger(0);
+			
+			Runnable retriever = new Runnable() {
+				
+				@Override
+				public void run() {
+					String host = "";
+					GenericInputSplit split;
+					while ((split = (GenericInputSplit) ia.getNextInputSplit(host)) != null) {
+						splitsRetrieved.incrementAndGet();
+						sumOfIds.addAndGet(split.getSplitNumber());
+					}
+				}
+			};
+			
+			// create the threads
+			Thread[] threads = new Thread[NUM_THREADS];
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i] = new Thread(retriever);
+				threads[i].setDaemon(true);
+			}
+			
+			// launch concurrently
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].start();
+			}
+			
+			// sync
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].join(5000);
+			}
+			
+			// verify
+			for (int i = 0; i < NUM_THREADS; i++) {
+				if (threads[i].isAlive()) {
+					fail("The concurrency test case is erroneous, the thread did not respond in time.");
+				}
+			}
+			
+			assertEquals(NUM_SPLITS, splitsRetrieved.get());
+			assertEquals(SUM_OF_IDS, sumOfIds.get());
+			
+			// nothing left
+			assertNull(ia.getNextInputSplit(""));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
new file mode 100644
index 0000000..aa56c1c
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import static org.junit.Assert.*;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.util.LogUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class LocatableSplitAssignerTest {
+	
+	@Test
+	public void testSerialSplitAssignmentWithNullHost() {
+		try {
+			final int NUM_SPLITS = 50;
+			final String[][] hosts = new String[][] {
+					new String[] { "localhost" },
+					new String[0],
+					null
+			};
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i%3]));
+			}
+			
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			InputSplit is = null;
+			while ((is = ia.getNextInputSplit(null)) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit(""));
+			assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
+			assertEquals(0, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialSplitAssignmentAllForSameHost() {
+		try {
+			final int NUM_SPLITS = 50;
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, "testhost"));
+			}
+			
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			InputSplit is = null;
+			while ((is = ia.getNextInputSplit("testhost")) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit(""));
+			
+			assertEquals(0, ia.getNumberOfRemoteAssignments());
+			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialSplitAssignmentAllForRemoteHost() {
+		try {
+			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
+			final int NUM_SPLITS = 10 * hosts.length;
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+			}
+			
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			InputSplit is = null;
+			while ((is = ia.getNextInputSplit("testhost")) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit("anotherHost"));
+			
+			assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
+			assertEquals(0, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialSplitAssignmentMixedLocalHost() {
+		try {
+			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
+			final int NUM_SPLITS = 10 * hosts.length;
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+			}
+			
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			InputSplit is = null;
+			int i = 0;
+			while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit("anotherHost"));
+			
+			assertEquals(0, ia.getNumberOfRemoteAssignments());
+			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConcurrentSplitAssignmentNullHost() {
+		try {
+			final int NUM_THREADS = 10;
+			final int NUM_SPLITS = 500;
+			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+			
+			final String[][] hosts = new String[][] {
+					new String[] { "localhost" },
+					new String[0],
+					null
+			};
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i%3]));
+			}
+			
+			final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			
+			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+			final AtomicInteger sumOfIds = new AtomicInteger(0);
+			
+			Runnable retriever = new Runnable() {
+				
+				@Override
+				public void run() {
+					LocatableInputSplit split;
+					while ((split = ia.getNextInputSplit(null)) != null) {
+						splitsRetrieved.incrementAndGet();
+						sumOfIds.addAndGet(split.getSplitNumber());
+					}
+				}
+			};
+			
+			// create the threads
+			Thread[] threads = new Thread[NUM_THREADS];
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i] = new Thread(retriever);
+				threads[i].setDaemon(true);
+			}
+			
+			// launch concurrently
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].start();
+			}
+			
+			// sync
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].join(5000);
+			}
+			
+			// verify
+			for (int i = 0; i < NUM_THREADS; i++) {
+				if (threads[i].isAlive()) {
+					fail("The concurrency test case is erroneous, the thread did not respond in time.");
+				}
+			}
+			
+			assertEquals(NUM_SPLITS, splitsRetrieved.get());
+			assertEquals(SUM_OF_IDS, sumOfIds.get());
+			
+			// nothing left
+			assertNull(ia.getNextInputSplit(""));
+			
+			assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
+			assertEquals(0, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConcurrentSplitAssignmentForSingleHost() {
+		try {
+			final int NUM_THREADS = 10;
+			final int NUM_SPLITS = 500;
+			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, "testhost"));
+			}
+			
+			final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			
+			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+			final AtomicInteger sumOfIds = new AtomicInteger(0);
+			
+			Runnable retriever = new Runnable() {
+				
+				@Override
+				public void run() {
+					LocatableInputSplit split;
+					while ((split = ia.getNextInputSplit("testhost")) != null) {
+						splitsRetrieved.incrementAndGet();
+						sumOfIds.addAndGet(split.getSplitNumber());
+					}
+				}
+			};
+			
+			// create the threads
+			Thread[] threads = new Thread[NUM_THREADS];
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i] = new Thread(retriever);
+				threads[i].setDaemon(true);
+			}
+			
+			// launch concurrently
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].start();
+			}
+			
+			// sync
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].join(5000);
+			}
+			
+			// verify
+			for (int i = 0; i < NUM_THREADS; i++) {
+				if (threads[i].isAlive()) {
+					fail("The concurrency test case is erroneous, the thread did not respond in time.");
+				}
+			}
+			
+			assertEquals(NUM_SPLITS, splitsRetrieved.get());
+			assertEquals(SUM_OF_IDS, sumOfIds.get());
+			
+			// nothing left
+			assertNull(ia.getNextInputSplit("testhost"));
+			
+			assertEquals(0, ia.getNumberOfRemoteAssignments());
+			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConcurrentSplitAssignmentForMultipleHosts() {
+		try {
+			final int NUM_THREADS = 10;
+			final int NUM_SPLITS = 500;
+			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+			
+			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i%hosts.length]));
+			}
+			
+			final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			
+			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+			final AtomicInteger sumOfIds = new AtomicInteger(0);
+			
+			Runnable retriever = new Runnable() {
+				
+				@Override
+				public void run() {
+					final String threadHost = hosts[(int) (Math.random() * hosts.length)];
+					
+					LocatableInputSplit split;
+					while ((split = ia.getNextInputSplit(threadHost)) != null) {
+						splitsRetrieved.incrementAndGet();
+						sumOfIds.addAndGet(split.getSplitNumber());
+					}
+				}
+			};
+			
+			// create the threads
+			Thread[] threads = new Thread[NUM_THREADS];
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i] = new Thread(retriever);
+				threads[i].setDaemon(true);
+			}
+			
+			// launch concurrently
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].start();
+			}
+			
+			// sync
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].join(5000);
+			}
+			
+			// verify
+			for (int i = 0; i < NUM_THREADS; i++) {
+				if (threads[i].isAlive()) {
+					fail("The concurrency test case is erroneous, the thread did not respond in time.");
+				}
+			}
+			
+			assertEquals(NUM_SPLITS, splitsRetrieved.get());
+			assertEquals(SUM_OF_IDS, sumOfIds.get());
+			
+			// nothing left
+			assertNull(ia.getNextInputSplit("testhost"));
+			
+			// at least one fraction of hosts needs be local, no matter how bad the thread races
+			assertTrue(ia.getNumberOfLocalAssignments() >= NUM_SPLITS / hosts.length);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
index 3518927..a5d2b91 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.api.java.record.io;
 import java.io.IOException;
 
 import org.junit.Assert;
-
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.record.io.ExternalProcessFixedLengthInputFormat;
 import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
@@ -286,8 +286,8 @@ private ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> format;
 		}
 
 		@Override
-		public Class<GenericInputSplit> getInputSplitType() {
-			return GenericInputSplit.class;
+		public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
+			return new DefaultInputSplitAssigner(splits);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
index 1b86ecb..96f6664 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
@@ -16,13 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.record.io;
 
 import java.io.IOException;
 
 import org.junit.Assert;
-
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
 import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
@@ -225,10 +224,10 @@ public class ExternalProcessInputFormatTest {
 		}
 
 		@Override
-		public Class<GenericInputSplit> getInputSplitType() {
-			return GenericInputSplit.class;
+		public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
+			return new DefaultInputSplitAssigner(splits);
 		}
-
+		
 		@Override
 		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
 			return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
index 2ff49e0..7ec5a6a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorInputFormatsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -141,7 +142,7 @@ public class TypeExtractorInputFormatsTest {
 		public InputSplit[] createInputSplits(int minNumSplits) { return null; }
 
 		@Override
-		public Class<? extends InputSplit> getInputSplitType() { return null; }
+		public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; }
 
 		@Override
 		public void open(InputSplit split) {}
@@ -211,7 +212,7 @@ public class TypeExtractorInputFormatsTest {
 		public InputSplit[] createInputSplits(int minNumSplits) { return null; }
 
 		@Override
-		public Class<? extends InputSplit> getInputSplitType() { return null; }
+		public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; }
 
 		@Override
 		public void open(InputSplit split) {}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
index c187961..1fa9491 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
@@ -41,6 +41,7 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
 
 	/** The size of the ID in byte */
 	public static final int SIZE = 2 * SIZE_OF_LONG;
+	
 
 	/** The upper part of the actual ID */
 	private long upperPart;
@@ -48,11 +49,13 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
 	/** The lower part of the actual ID */
 	private long lowerPart;
 
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Constructs a new ID with a specific bytes value.
 	 */
 	public AbstractID(byte[] bytes) {
-		if (bytes.length != SIZE) {
+		if (bytes == null || bytes.length != SIZE) {
 			throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
 		}
 
@@ -80,6 +83,9 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
 	 * @param id the abstract ID to copy
 	 */
 	public AbstractID(AbstractID id) {
+		if (id == null) {
+			throw new IllegalArgumentException("Id must not be null.");
+		}
 		this.lowerPart = id.lowerPart;
 		this.upperPart = id.upperPart;
 	}
@@ -91,7 +97,19 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
 		this.lowerPart = generateRandomLong();
 		this.upperPart = generateRandomLong();
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public long getLowerPart() {
+		return lowerPart;
+	}
+	
+	public long getUpperPart() {
+		return upperPart;
+	}
 
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Generates a uniformly distributed random positive long.
 	 *

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java
new file mode 100644
index 0000000..c4a4211
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/JobException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+public class JobException extends Exception {
+
+	private static final long serialVersionUID = 1275864691743020176L;
+
+	public JobException(String msg) {
+		super(msg);
+	}
+
+	public JobException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java
deleted file mode 100644
index 4960d80..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/JobSubmissionException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime;
-
-/**
- * A job submission exception is thrown if an error occurs while submitting
- * a job from the client to the job manager.
- * 
- */
-public class JobSubmissionException extends Exception {
-
-	/**
-	 * Generated serial UID
-	 */
-	private static final long serialVersionUID = 1275864691743020176L;
-
-	/**
-	 * Constructs a new job submission exception with the given error message.
-	 * 
-	 * @param msg
-	 *        the error message to be transported through this exception
-	 */
-	public JobSubmissionException(String msg) {
-		super(msg);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java
index 2ca8b68..416453f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/AbstractJobResult.java
@@ -37,18 +37,13 @@ public abstract class AbstractJobResult implements IOReadableWritable {
 
 	/**
 	 * The possible return codes for a job operation.
-	 * 
 	 */
 	public enum ReturnCode {
 
-		/**
-		 * The success return code.
-		 */
+		/** The success return code. */
 		SUCCESS,
 
-		/**
-		 * The error return code.
-		 */
+		/** The error return code. */
 		ERROR
 	};
 
@@ -60,7 +55,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
 	/**
 	 * An optional description which can provide further information in case of an error.
 	 */
-	private String description = null;
+	private String description = "";
 
 	/**
 	 * Constructs a new abstract job result object and sets the description.
@@ -70,7 +65,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
 	 * @param description
 	 *        the optional error description
 	 */
-	public AbstractJobResult(final ReturnCode returnCode, final String description) {
+	public AbstractJobResult(ReturnCode returnCode, String description) {
 		this.returnCode = returnCode;
 		this.description = description;
 	}
@@ -79,8 +74,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
 	 * Construct a new abstract job result object. This constructor is required
 	 * for the deserialization process.
 	 */
-	public AbstractJobResult() {
-	}
+	public AbstractJobResult() {}
 
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 79da72f..028713d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.client;
 
 import java.io.IOException;
@@ -45,29 +44,19 @@ import org.apache.flink.util.StringUtils;
 
 /**
  * The job client is able to submit, control, and abort jobs.
- * <p>
- * This class is thread-safe.
  */
 public class JobClient {
 
-	/**
-	 * The logging object used for debugging.
-	 */
+	/** The logging object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);
 
-	/**
-	 * The job management server stub.
-	 */
+	/** The job management server stub.*/
 	private final JobManagementProtocol jobSubmitClient;
 
-	/**
-	 * The accumulator protocol stub to request accumulators from JobManager
-	 */
+	/** The accumulator protocol stub to request accumulators from JobManager */
 	private AccumulatorProtocol accumulatorProtocolProxy;
 
-	/**
-	 * The job graph assigned with this job client.
-	 */
+	/** The job graph assigned with this job client. */
 	private final JobGraph jobGraph;
 
 	/**
@@ -121,7 +110,7 @@ public class JobClient {
 
 	/**
 	 * Constructs a new job client object and instantiates a local
-	 * RPC proxy for the {@link JobSubmissionProtocol}.
+	 * RPC proxy for the JobSubmissionProtocol
 	 * 
 	 * @param jobGraph
 	 *        the job graph to run
@@ -134,7 +123,7 @@ public class JobClient {
 
 	/**
 	 * Constructs a new job client object and instantiates a local
-	 * RPC proxy for the {@link JobSubmissionProtocol}.
+	 * RPC proxy for the JobSubmissionProtocol
 	 * 
 	 * @param jobGraph
 	 *        the job graph to run
@@ -160,7 +149,7 @@ public class JobClient {
 
 	/**
 	 * Constructs a new job client object and instantiates a local
-	 * RPC proxy for the {@link JobSubmissionProtocol}.
+	 * RPC proxy for the JobSubmissionProtocol
 	 * 
 	 * @param jobGraph
 	 *        the job graph to run
@@ -335,7 +324,7 @@ public class JobClient {
 				if (event instanceof JobEvent) {
 					final JobEvent jobEvent = (JobEvent) event;
 					final JobStatus jobStatus = jobEvent.getCurrentJobStatus();
-					if (jobStatus == JobStatus.SCHEDULED) {
+					if (jobStatus == JobStatus.RUNNING) {
 						startTimestamp = jobEvent.getTimestamp();
 					}
 					if (jobStatus == JobStatus.FINISHED) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java
index c755dc6..53319d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionResult.java
@@ -16,14 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.client;
 
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
 /**
  * A <code>JobSubmissionResult</code> is used to report the results
  * of a job submission. It contains a return code and a description.
@@ -53,17 +47,4 @@ public class JobSubmissionResult extends AbstractJobResult {
 	public JobSubmissionResult() {
 		super();
 	}
-
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		super.read(in);
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		super.write(out);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
index f89e999..b4a38f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.deployment;
 
 import java.io.IOException;
@@ -24,43 +23,30 @@ import java.io.IOException;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge2;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
 
 /**
  * A channel deployment descriptor contains all the information necessary to deploy either an input or an output channel
  * as part of a task on a task manager.
- * <p>
- * This class is not thread-safe in general.
- * 
  */
 public final class ChannelDeploymentDescriptor implements IOReadableWritable {
 
-	/**
-	 * The ID of the output channel.
-	 */
+	/** The ID of the output channel. */
 	private final ChannelID outputChannelID;
 
-	/**
-	 * The ID of the input channel.
-	 */
+	/** The ID of the input channel. */
 	private final ChannelID inputChannelID;
 
 	/**
 	 * Constructs a new channel deployment descriptor.
 	 * 
-	 * @param outputChannelID
-	 *        the ID of the output channel
-	 * @param inputChannelID
-	 *        the ID of the input channel
+	 * @param outputChannelID The ID of the output channel
+	 * @param inputChannelID The ID of the input channel
 	 */
-	public ChannelDeploymentDescriptor(final ChannelID outputChannelID, final ChannelID inputChannelID) {
-
-		if (outputChannelID == null) {
-			throw new IllegalArgumentException("Argument outputChannelID must not be null");
-		}
-
-		if (inputChannelID == null) {
-			throw new IllegalArgumentException("Argument inputChannelID must not be null");
+	public ChannelDeploymentDescriptor(ChannelID outputChannelID, ChannelID inputChannelID) {
+		if (outputChannelID == null || inputChannelID == null) {
+			throw new IllegalArgumentException("Channel IDs must not be null");
 		}
 
 		this.outputChannelID = outputChannelID;
@@ -71,23 +57,20 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
 	 * Default constructor for serialization/deserialization.
 	 */
 	public ChannelDeploymentDescriptor() {
-
 		this.outputChannelID = new ChannelID();
 		this.inputChannelID = new ChannelID();
 	}
 
 
 	@Override
-	public void write(final DataOutputView out) throws IOException {
-
+	public void write(DataOutputView out) throws IOException {
 		this.outputChannelID.write(out);
 		this.inputChannelID.write(out);
 	}
 
 
 	@Override
-	public void read(final DataInputView in) throws IOException {
-
+	public void read(DataInputView in) throws IOException {
 		this.outputChannelID.read(in);
 		this.inputChannelID.read(in);
 	}
@@ -98,7 +81,6 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
 	 * @return the output channel ID attached to this deployment descriptor
 	 */
 	public ChannelID getOutputChannelID() {
-
 		return this.outputChannelID;
 	}
 
@@ -108,7 +90,12 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
 	 * @return the input channel ID attached to this deployment descriptor
 	 */
 	public ChannelID getInputChannelID() {
-
 		return this.inputChannelID;
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static ChannelDeploymentDescriptor fromExecutionEdge(ExecutionEdge2 edge) {
+		return new ChannelDeploymentDescriptor(edge.getOutputChannelId(), edge.getInputChannelId());
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
index dfec497..e4a447f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
@@ -16,74 +16,36 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.deployment;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.util.EnumUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge2;
 
 /**
- * A gate deployment descriptor contains all the information necessary to deploy either an input or an output gate as
- * part of a task on a task manager.
- * <p>
- * This class is not thread-safe in general.
- * 
+ * A gate deployment descriptor contains the deployment descriptors for the channels associated with that gate.
  */
 public final class GateDeploymentDescriptor implements IOReadableWritable {
 
-	/**
-	 * The ID of the gate.
-	 */
-	private final GateID gateID;
-
-	/**
-	 * The channel type of the gate.
-	 */
-	private ChannelType channelType;
-
-	/**
-	 * The list of channel deployment descriptors attached to this gate.
-	 */
+	/** The list of channel deployment descriptors attached to this gate. */
 	private final List<ChannelDeploymentDescriptor> channels;
 
 	/**
 	 * Constructs a new gate deployment descriptor
 	 * 
-	 * @param gateID
-	 *        the ID of the gate
-	 * @param channelType
-	 *        the channel type of the gate
-	 * @param compressionLevel
-	 *        the compression level of the gate
 	 * @param channels
 	 *        the list of channel deployment descriptors attached to this gate
 	 */
-	public GateDeploymentDescriptor(final GateID gateID, final ChannelType channelType,
-			List<ChannelDeploymentDescriptor> channels) {
-
-		if (gateID == null) {
-			throw new IllegalArgumentException("Argument gateID must no be null");
-		}
-
-		if (channelType == null) {
-			throw new IllegalArgumentException("Argument channelType must no be null");
-		}
-
+	public GateDeploymentDescriptor(List<ChannelDeploymentDescriptor> channels) {
 		if (channels == null) {
-			throw new IllegalArgumentException("Argument channels must no be null");
+			throw new NullPointerException();
 		}
 
-		this.gateID = gateID;
-		this.channelType = channelType;
 		this.channels = channels;
 	}
 
@@ -91,71 +53,49 @@ public final class GateDeploymentDescriptor implements IOReadableWritable {
 	 * Default constructor for serialization/deserialization.
 	 */
 	public GateDeploymentDescriptor() {
-
-		this.gateID = new GateID();
-		this.channelType = null;
 		this.channels = new ArrayList<ChannelDeploymentDescriptor>();
 	}
 
-
+	
+	public List<ChannelDeploymentDescriptor> getChannels() {
+		return channels;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
 	@Override
 	public void write(final DataOutputView out) throws IOException {
-
-		this.gateID.write(out);
-		EnumUtils.writeEnum(out, channelType);
 		out.writeInt(this.channels.size());
-		final Iterator<ChannelDeploymentDescriptor> it = this.channels.iterator();
-		while (it.hasNext()) {
-			it.next().write(out);
+		for (ChannelDeploymentDescriptor cdd : this.channels) {
+			cdd.write(out);
 		}
 	}
 
-
 	@Override
 	public void read(final DataInputView in) throws IOException {
-
-		this.gateID.read(in);
-		this.channelType = EnumUtils.readEnum(in, ChannelType.class);
 		final int nocdd = in.readInt();
 		for (int i = 0; i < nocdd; ++i) {
-			final ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor();
+			ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor();
 			cdd.read(in);
 			this.channels.add(cdd);
 		}
 	}
-
-	/**
-	 * Returns the ID of the gate.
-	 * 
-	 * @return the ID of the gate
-	 */
-	public GateID getGateID() {
-
-		return this.gateID;
-	}
-
-	/**
-	 * Returns the channel type of the gate.
-	 * 
-	 * @return the channel type of the gate
-	 */
-	public ChannelType getChannelType() {
-
-		return this.channelType;
-	}
-
-	/**
-	 * Returns the number of channel deployment descriptors attached to this gate descriptor.
-	 * 
-	 * @return the number of channel deployment descriptors
-	 */
-	public int getNumberOfChannelDescriptors() {
-
-		return this.channels.size();
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static GateDeploymentDescriptor fromEdges(List<ExecutionEdge2> edges) {
+		List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(edges.size());
+		for (ExecutionEdge2 edge : edges) {
+			channels.add(ChannelDeploymentDescriptor.fromExecutionEdge(edge));
+		}
+		return new GateDeploymentDescriptor(channels);
 	}
-
-	public ChannelDeploymentDescriptor getChannelDescriptor(final int index) {
-
-		return this.channels.get(index);
+	
+	public static GateDeploymentDescriptor fromEdges(ExecutionEdge2[] edges) {
+		List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(edges.length);
+		for (ExecutionEdge2 edge : edges) {
+			channels.add(ChannelDeploymentDescriptor.fromExecutionEdge(edge));
+		}
+		return new GateDeploymentDescriptor(channels);
 	}
 }