You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:51 UTC

[27/63] [abbrv] git commit: [FLINK-1094] Reworked, improved, and testes split assigners

[FLINK-1094] Reworked, improved, and testes split assigners


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c32569ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c32569ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c32569ae

Branch: refs/heads/master
Commit: c32569aed12ffa968e2c2289c2d56db262c0eba4
Parents: 028fcf5
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 18 04:22:00 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/BlockLocation.java |  10 +-
 .../flink/core/fs/FileChannelWrapper.java       | 239 ------------
 .../apache/flink/core/fs/FileInputSplit.java    | 148 +++-----
 .../apache/flink/core/io/GenericInputSplit.java |  39 +-
 .../org/apache/flink/core/io/InputSplit.java    |   5 +-
 .../flink/core/io/InputSplitAssigner.java       |  35 ++
 .../flink/core/io/LocatableInputSplit.java      |  96 +++--
 .../DefaultInputSplitAssigner.java              |  96 ++---
 .../splitassigner/InputSplitAssigner.java       |  59 ---
 .../splitassigner/InputSplitManager.java        | 266 -------------
 .../splitassigner/InputSplitTracker.java        | 166 --------
 .../splitassigner/InputSplitWrapper.java        |  20 +-
 .../LocatableInputSplitAssigner.java            | 225 +++++++----
 .../splitassigner/LocatableInputSplitList.java  | 211 -----------
 .../file/FileInputSplitAssigner.java            | 119 ------
 .../splitassigner/file/FileInputSplitList.java  | 209 ----------
 .../instance/DefaultInstanceManagerTest.java    |   4 -
 .../instance/LocalInstanceManagerTest.java      |   9 +-
 .../splitassigner/DefaultSplitAssignerTest.java | 121 ++++++
 .../LocatableSplitAssignerTest.java             | 379 +++++++++++++++++++
 20 files changed, 883 insertions(+), 1573 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java b/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
index b557c36..14056ab 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
@@ -16,22 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.fs;
 
 import java.io.IOException;
 
 /**
- * A BlockLocation lists hosts, offset and length
- * of block.
- * 
+ * A BlockLocation lists hosts, offset and length of block.
  */
 public interface BlockLocation extends Comparable<BlockLocation> {
 
 	/**
 	 * Get the list of hosts (hostname) hosting this block.
 	 * 
-	 * @return a list of hosts (hostname) hosting this block
+	 * @return A list of hosts (hostname) hosting this block.
 	 * @throws IOException
 	 *         thrown if the list of hosts could not be retrieved
 	 */
@@ -40,7 +37,7 @@ public interface BlockLocation extends Comparable<BlockLocation> {
 	/**
 	 * Get the start offset of the file associated with this block.
 	 * 
-	 * @return the start offset of the file associated with this block
+	 * @return The start offset of the file associated with this block.
 	 */
 	long getOffset();
 
@@ -50,5 +47,4 @@ public interface BlockLocation extends Comparable<BlockLocation> {
 	 * @return the length of the block
 	 */
 	long getLength();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/fs/FileChannelWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileChannelWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileChannelWrapper.java
deleted file mode 100644
index bf87642..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileChannelWrapper.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.core.fs;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-
-public final class FileChannelWrapper extends FileChannel {
-
-	private final FileSystem fs;
-
-	private final Path checkpointFile;
-
-	private final byte[] buf;
-
-	private final short replication;
-
-	private FSDataOutputStream outputStream = null;
-
-	private FSDataInputStream inputStream = null;
-
-	private long nextExpectedWritePosition = 0L;
-
-	private long nextExpectedReadPosition = 0L;
-
-	public FileChannelWrapper(final FileSystem fs, final Path checkpointFile, final int bufferSize,
-			final short replication) {
-
-		this.fs = fs;
-		this.checkpointFile = checkpointFile;
-		this.buf = new byte[bufferSize];
-		this.replication = replication;
-	}
-
-
-	@Override
-	public void force(final boolean metaData) throws IOException {
-
-		throw new UnsupportedOperationException("Method force is not implemented");
-	}
-
-
-	@Override
-	public FileLock lock(final long position, final long size, final boolean shared) throws IOException {
-
-		throw new UnsupportedOperationException("Method lock is not implemented");
-	}
-
-
-	@Override
-	public MappedByteBuffer map(final MapMode mode, final long position, final long size) throws IOException {
-
-		throw new UnsupportedOperationException("Method map is not implemented");
-	}
-
-
-	@Override
-	public long position() throws IOException {
-
-		throw new UnsupportedOperationException("Method position is not implemented");
-	}
-
-
-	@Override
-	public FileChannel position(final long newPosition) throws IOException {
-
-		throw new UnsupportedOperationException("Method position is not implemented");
-	}
-
-
-	@Override
-	public int read(final ByteBuffer dst) throws IOException {
-
-		throw new UnsupportedOperationException("Method read is not implemented");
-	}
-
-
-	@Override
-	public int read(final ByteBuffer dst, final long position) throws IOException {
-
-		final int length = Math.min(this.buf.length, dst.remaining());
-
-		final FSDataInputStream inputStream = getInputStream();
-		if (position != this.nextExpectedReadPosition) {
-			System.out.println("Next expected position is " + this.nextExpectedReadPosition + ", seeking to "
-				+ position);
-			inputStream.seek(position);
-			this.nextExpectedReadPosition = position;
-		}
-
-		final int bytesRead = inputStream.read(this.buf, 0, length);
-		if (bytesRead == -1) {
-			return -1;
-		}
-
-		dst.put(this.buf, 0, length);
-
-		this.nextExpectedReadPosition += bytesRead;
-
-		return bytesRead;
-	}
-
-
-	@Override
-	public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
-
-		throw new UnsupportedOperationException("Method read is not implemented");
-	}
-
-
-	@Override
-	public long size() throws IOException {
-
-		throw new UnsupportedOperationException("Method size is not implemented");
-	}
-
-
-	@Override
-	public long transferFrom(final ReadableByteChannel src, final long position, final long count) throws IOException {
-
-		throw new UnsupportedOperationException("Method transferFrom is not implemented");
-	}
-
-
-	@Override
-	public long transferTo(final long position, final long count, final WritableByteChannel target) throws IOException {
-
-		throw new UnsupportedOperationException("Method transferTo is not implemented");
-	}
-
-
-	@Override
-	public FileChannel truncate(final long size) throws IOException {
-
-		throw new UnsupportedOperationException("Method truncate is not implemented");
-	}
-
-
-	@Override
-	public FileLock tryLock(final long position, final long size, final boolean shared) throws IOException {
-
-		throw new UnsupportedOperationException("Method tryLock is not implemented");
-	}
-
-
-	@Override
-	public int write(final ByteBuffer src) throws IOException {
-
-		return write(src, this.nextExpectedWritePosition);
-	}
-
-
-	@Override
-	public int write(final ByteBuffer src, final long position) throws IOException {
-
-		if (position != this.nextExpectedWritePosition) {
-			throw new IOException("Next expected write position is " + this.nextExpectedWritePosition);
-		}
-
-		final FSDataOutputStream outputStream = getOutputStream();
-
-		int totalBytesWritten = 0;
-
-		while (src.hasRemaining()) {
-
-			final int length = Math.min(this.buf.length, src.remaining());
-			src.get(this.buf, 0, length);
-			outputStream.write(this.buf, 0, length);
-			totalBytesWritten += length;
-		}
-
-		this.nextExpectedWritePosition += totalBytesWritten;
-
-		return totalBytesWritten;
-	}
-
-
-	@Override
-	public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
-
-		throw new UnsupportedOperationException("Method write is not implemented");
-	}
-
-	private FSDataOutputStream getOutputStream() throws IOException {
-
-		if (this.outputStream == null) {
-			this.outputStream = this.fs.create(this.checkpointFile, false, this.buf.length, this.replication,
-				this.fs.getDefaultBlockSize());
-		}
-
-		return this.outputStream;
-	}
-
-	private FSDataInputStream getInputStream() throws IOException {
-
-		if (this.inputStream == null) {
-			this.inputStream = this.fs.open(this.checkpointFile, this.buf.length);
-		}
-
-		return this.inputStream;
-	}
-
-
-	@Override
-	protected void implCloseChannel() throws IOException {
-
-		if (this.outputStream != null) {
-			this.outputStream.close();
-			this.outputStream = null;
-		}
-
-		if (this.inputStream != null) {
-			this.inputStream.close();
-			this.inputStream = null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
index 40058c0..c4a69ba 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
@@ -16,28 +16,21 @@
  * limitations under the License.
  */
 
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership. 
- */
-
 package org.apache.flink.core.fs;
 
 import java.io.IOException;
 
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.StringRecord;
+import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
  * A file input split provides information on a particular part of a file, possibly
- * hosted on a distributed file system and replicated among several hosts.
- * 
+ * hosted on a distributed file system and replicated among several hosts. 
  */
-public class FileInputSplit implements InputSplit {
+public class FileInputSplit extends LocatableInputSplit {
+
+	private static final long serialVersionUID = 1L;
 
 	/**
 	 * The path of the file this file split refers to.
@@ -54,16 +47,8 @@ public class FileInputSplit implements InputSplit {
 	 */
 	private long length;
 
-	/**
-	 * List of hosts (hostnames) containing the block, possibly <code>null</code>.
-	 */
-	private String[] hosts;
-
-	/**
-	 * The logical number of the split.
-	 */
-	private int partitionNumber;
-
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Constructs a split with host information.
 	 * 
@@ -78,20 +63,21 @@ public class FileInputSplit implements InputSplit {
 	 * @param hosts
 	 *        the list of hosts containing the block, possibly <code>null</code>
 	 */
-	public FileInputSplit(final int num, final Path file, final long start, final long length, final String[] hosts) {
-		this.partitionNumber = num;
+	public FileInputSplit(int num, Path file, long start, long length, String[] hosts) {
+		super(num, hosts);
+		
 		this.file = file;
 		this.start = start;
 		this.length = length;
-		this.hosts = hosts;
 	}
 
 	/**
-	 * Constructor used to reconstruct the object at the receiver of an RPC call.
+	 * Default constructor for deserialization.
 	 */
-	public FileInputSplit() {
-	}
+	public FileInputSplit() {}
 
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Returns the path of the file containing this split's data.
 	 * 
@@ -118,35 +104,17 @@ public class FileInputSplit implements InputSplit {
 	public long getLength() {
 		return length;
 	}
-
-	/**
-	 * Gets the names of the hosts that this file split resides on.
-	 * 
-	 * @return The names of the hosts that this file split resides on.
-	 */
-	public String[] getHostNames() {
-		if (this.hosts == null) {
-			return new String[] {};
-		} else {
-			return this.hosts;
-		}
-	}
+	
+	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public int getSplitNumber() {
-		return this.partitionNumber;
-	}
-
-	@Override
-	public String toString() {
-		return "[" + this.partitionNumber + "] " + file + ":" + start + "+" + length;
-	}
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		// write partition number
-		out.writeInt(this.partitionNumber);
+	public void write(DataOutputView out) throws IOException {
+		super.write(out);
 
+		// write start and length
+		out.writeLong(this.start);
+		out.writeLong(this.length);
+		
 		// write file
 		if (this.file != null) {
 			out.writeBoolean(true);
@@ -154,48 +122,56 @@ public class FileInputSplit implements InputSplit {
 		} else {
 			out.writeBoolean(false);
 		}
-
-		// write start and length
-		out.writeLong(this.start);
-		out.writeLong(this.length);
-
-		// write hosts
-		if (this.hosts == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			out.writeInt(this.hosts.length);
-			for (int i = 0; i < this.hosts.length; i++) {
-				StringRecord.writeString(out, this.hosts[i]);
-			}
-		}
 	}
 
-
 	@Override
-	public void read(final DataInputView in) throws IOException {
-		// read partition number
-		this.partitionNumber = in.readInt();
-
+	public void read(DataInputView in) throws IOException {
+		super.read(in);
+		
+		this.start = in.readLong();
+		this.length = in.readLong();
+		
 		// read file path
 		boolean isNotNull = in.readBoolean();
 		if (isNotNull) {
 			this.file = new Path();
 			this.file.read(in);
 		}
-
-		this.start = in.readLong();
-		this.length = in.readLong();
-
-		isNotNull = in.readBoolean();
-		if (isNotNull) {
-			final int numHosts = in.readInt();
-			this.hosts = new String[numHosts];
-			for (int i = 0; i < numHosts; i++) {
-				this.hosts[i] = StringRecord.readString(in);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return getSplitNumber() ^ (file == null ? 0 : file.hashCode());
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj != null && super.equals(obj) && obj instanceof FileInputSplit) {
+			FileInputSplit other = (FileInputSplit) obj;
+			
+			if (this.file != null) {
+				if (!this.file.equals(other.file)) {
+					return false;
+				}
 			}
-		} else {
-			this.hosts = null;
+			else if (other.file != null) {
+				return false;
+			}
+			
+			return this.start == other.start && this.length == other.length;
+		}
+		else {
+			return false;
 		}
 	}
+	
+	@Override
+	public String toString() {
+		return '[' + getSplitNumber() + "] " + file + ":" + start + "+" + length;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
index ceecfa5..850ba1c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.io;
 
 import java.io.IOException;
@@ -27,17 +26,19 @@ import org.apache.flink.core.memory.DataOutputView;
 /**
  * A generic input split that has only a partition number.
  */
-public class GenericInputSplit implements InputSplit {
+public class GenericInputSplit implements InputSplit, java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
 
 	/**
 	 * The number of this split.
 	 */
-	protected int partitionNumber;
+	private int partitionNumber;
 
 	/**
 	 * The total number of partitions
 	 */
-	protected int totalNumberOfPartitions;
+	private int totalNumberOfPartitions;
 	
 	// --------------------------------------------------------------------------------------------
 
@@ -60,6 +61,17 @@ public class GenericInputSplit implements InputSplit {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
+	public int getSplitNumber() {
+		return this.partitionNumber;
+	}
+	
+	public int getTotalNumberOfSplits() {
+		return this.totalNumberOfPartitions;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeInt(this.partitionNumber);
 		out.writeInt(this.totalNumberOfPartitions);
@@ -70,16 +82,25 @@ public class GenericInputSplit implements InputSplit {
 		this.partitionNumber = in.readInt();
 		this.totalNumberOfPartitions = in.readInt();
 	}
+	
+	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public int getSplitNumber() {
-		return this.partitionNumber;
+	public int hashCode() {
+		return this.partitionNumber ^ this.totalNumberOfPartitions;
 	}
 	
-	public int getTotalNumberOfSplits() {
-		return this.totalNumberOfPartitions;
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj instanceof GenericInputSplit) {
+			GenericInputSplit other = (GenericInputSplit) obj;
+			return this.partitionNumber == other.partitionNumber &&
+					this.totalNumberOfPartitions == other.totalNumberOfPartitions;
+		} else {
+			return false;
+		}
 	}
-
+	
 	public String toString() {
 		return "GenericSplit (" + this.partitionNumber + "/" + this.totalNumberOfPartitions + ")";
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
index 5f09d10..5ce0378 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.io;
 
-
 /**
  * This interface must be implemented by all kind of input splits that can be assigned to input formats.
  */
-public interface InputSplit extends IOReadableWritable {
+public interface InputSplit extends IOReadableWritable, java.io.Serializable {
+	
 	/**
 	 * Returns the number of this input split.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
new file mode 100644
index 0000000..f01c9f3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+/**
+ * An input split assigner distributes the {@link InputSplit}s among the instances on which a
+ * data source exists.
+ */
+public interface InputSplitAssigner {
+
+	/**
+	 * Returns the next input split that shall be consumed. The consumer's host is passed as a parameter
+	 * to allow localized assignments.
+	 * 
+	 * @param host The address of the host to assign the split to.
+	 * @return the next input split to be consumed, or <code>null</code> if no more splits remain.
+	 */
+	InputSplit getNextInputSplit(String host);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
index 9a4e366..cc36d99 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
@@ -16,20 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.io;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
 /**
  * A locatable input split is an input split referring to input data which is located on one or more hosts.
  */
-public class LocatableInputSplit implements InputSplit {
+public class LocatableInputSplit implements InputSplit, java.io.Serializable {
+	
+	private static final long serialVersionUID = 1L;
 
+	private static final String[] EMPTY_ARR = new String[0];
+	
 	/**
 	 * The number of the split.
 	 */
@@ -40,6 +43,8 @@ public class LocatableInputSplit implements InputSplit {
 	 */
 	private String[] hostnames;
 
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Creates a new locatable input split.
 	 * 
@@ -48,73 +53,86 @@ public class LocatableInputSplit implements InputSplit {
 	 * @param hostnames
 	 *        the names of the hosts storing the data this input split refers to
 	 */
-	public LocatableInputSplit(final int splitNumber, final String[] hostnames) {
-
-		this.hostnames = hostnames;
+	public LocatableInputSplit(int splitNumber, String[] hostnames) {
+		this.splitNumber = splitNumber;
+		this.hostnames = hostnames == null ? EMPTY_ARR : hostnames;
+	}
+	
+	public LocatableInputSplit(int splitNumber, String hostname) {
+		this.splitNumber = splitNumber;
+		this.hostnames = hostname == null ? EMPTY_ARR : new String[] { hostname };
 	}
 
 	/**
 	 * Default constructor for serialization/deserialization.
 	 */
-	public LocatableInputSplit() {
-	}
+	public LocatableInputSplit() {}
 
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int getSplitNumber() {
+		return this.splitNumber;
+	}
+	
 	/**
 	 * Returns the names of the hosts storing the data this input split refers to
 	 * 
 	 * @return the names of the hosts storing the data this input split refers to
 	 */
 	public String[] getHostnames() {
-
-		if (this.hostnames == null) {
-			return new String[] {};
-		}
-
 		return this.hostnames;
 	}
 
+	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		// Write the split number
+	public void write(DataOutputView out) throws IOException {
 		out.writeInt(this.splitNumber);
-
-		// Write hostnames
-		if (this.hostnames == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			out.writeInt(this.hostnames.length);
-			for (int i = 0; i < this.hostnames.length; i++) {
-				StringRecord.writeString(out, this.hostnames[i]);
-			}
+		out.writeInt(this.hostnames.length);
+		for (int i = 0; i < this.hostnames.length; i++) {
+			StringRecord.writeString(out, this.hostnames[i]);
 		}
 	}
 
-
 	@Override
-	public void read(final DataInputView in) throws IOException {
-
-		// Read the split number
+	public void read(DataInputView in) throws IOException {
 		this.splitNumber = in.readInt();
 
-		// Read hostnames
-		if (in.readBoolean()) {
-			final int numHosts = in.readInt();
+		final int numHosts = in.readInt();
+		if (numHosts == 0) {
+			this.hostnames = EMPTY_ARR;
+		} else {
 			this.hostnames = new String[numHosts];
 			for (int i = 0; i < numHosts; i++) {
 				this.hostnames[i] = StringRecord.readString(in);
 			}
-		} else {
-			this.hostnames = null;
 		}
 	}
-
-
+	
+	// --------------------------------------------------------------------------------------------
+	
 	@Override
-	public int getSplitNumber() {
-
+	public int hashCode() {
 		return this.splitNumber;
 	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj != null && obj instanceof LocatableInputSplit) {
+			LocatableInputSplit other = (LocatableInputSplit) obj;
+			return other.splitNumber == this.splitNumber && Arrays.deepEquals(other.hostnames, this.hostnames);
+		}
+		else {
+			return false;
+		}
+	}
+	
+	@Override
+	public String toString() {
+		return "Locatable Split (" + splitNumber + ") at " + Arrays.toString(this.hostnames);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
index 916772a..44dae64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
@@ -16,88 +16,60 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager.splitassigner;
 
-import java.util.Arrays;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.core.io.InputSplitAssigner;
 
 /**
  * This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner
- * simply returns all input splits of an input vertex in the order they were originally computed. The default input
- * split assigner is always used when a more specific {@link InputSplitAssigned} could not be found.
- * <p>
- * This class is thread-safe.
- * 
+ * simply returns all input splits of an input vertex in the order they were originally computed.
  */
 public class DefaultInputSplitAssigner implements InputSplitAssigner {
 
-	/**
-	 * The logging object used to report information and errors.
-	 */
+	/** The logging object used to report information and errors. */
 	private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);
 
-	/**
-	 * The split map stores a list of all input splits that still must be consumed by a specific input vertex.
-	 */
-	private final ConcurrentMap<ExecutionGroupVertex, Queue<InputSplit>> splitMap = new ConcurrentHashMap<ExecutionGroupVertex, Queue<InputSplit>>();
-
-
-	@Override
-	public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
-
-		final InputSplit[] inputSplits = groupVertex.getInputSplits();
+	/** The list of all splits */
+	private final List<InputSplit> splits = new ArrayList<InputSplit>();
 
-		if (inputSplits == null) {
-			return;
-		}
-
-		if (inputSplits.length == 0) {
-			return;
-		}
-
-		final Queue<InputSplit> queue = new ConcurrentLinkedQueue<InputSplit>();
-		if (this.splitMap.putIfAbsent(groupVertex, queue) != null) {
-			LOG.error("Group vertex " + groupVertex.getName() + " already has a split queue");
-		}
 
-		queue.addAll(Arrays.asList(inputSplits));
+	public DefaultInputSplitAssigner(InputSplit[] splits) {
+		Collections.addAll(this.splits, splits);
 	}
-
-
-	@Override
-	public void unregisterGroupVertex(final ExecutionGroupVertex groupVertex) {
-
-		this.splitMap.remove(groupVertex);
+	
+	public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) {
+		this.splits.addAll(splits);
 	}
-
-
+	
+	
 	@Override
-	public InputSplit getNextInputSplit(final ExecutionVertex vertex) {
-
-		final Queue<InputSplit> queue = this.splitMap.get(vertex.getGroupVertex());
-		if (queue == null) {
-			final JobID jobID = vertex.getExecutionGraph().getJobID();
-			LOG.error("Cannot find split queue for vertex " + vertex.getGroupVertex() + " (job " + jobID + ")");
-			return null;
+	public InputSplit getNextInputSplit(String host) {
+		InputSplit next = null;
+		
+		// keep the synchronized part short
+		synchronized (this.splits) {
+			if (this.splits.size() > 0) {
+				next = this.splits.remove(this.splits.size() - 1);
+			}
 		}
-
-		InputSplit nextSplit = queue.poll();
-
-		if (LOG.isDebugEnabled() && nextSplit != null) {
-			LOG.debug("Assigning split " + nextSplit.getSplitNumber() + " to " + vertex);
+		
+		if (LOG.isDebugEnabled()) {
+			if (next == null) {
+				LOG.debug("Assigning split " + next + " to " + host);
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("No more input splits available");
+				}
+			}
 		}
-
-		return nextSplit;
+		return next;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitAssigner.java
deleted file mode 100644
index 70e7ef9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitAssigner.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-
-/**
- * The input split assigner interface must be implemented by every component which is supposed to dynamically offer
- * input splits to the input vertices of a job at runtime.
- * 
- */
-public interface InputSplitAssigner {
-
-	/**
-	 * Registers an input vertex with the input split assigner.
-	 * 
-	 * @param groupVertex
-	 *        the input vertex to register
-	 */
-	void registerGroupVertex(ExecutionGroupVertex groupVertex);
-
-	/**
-	 * Unregisters an input vertex from the input split assigner. All resources allocated to the input vertex are freed
-	 * as part of this operation.
-	 * 
-	 * @param groupVertex
-	 *        the input vertex to unregister
-	 */
-	void unregisterGroupVertex(ExecutionGroupVertex groupVertex);
-
-	/**
-	 * Returns the next input split that shall be consumed by the given input vertex.
-	 * 
-	 * @param vertex
-	 *        the vertex for which the next input split to be consumed shall be determined
-	 * @return the next input split to be consumed or <code>null</code> if no more splits shall be consumed by the given
-	 *         vertex
-	 */
-	InputSplit getNextInputSplit(ExecutionVertex vertex);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
deleted file mode 100644
index d9de4ee..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertexIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitAssigner;
-import org.apache.flink.util.StringUtils;
-
-/**
- * The input split manager is responsible for serving input splits to {@link AbstractInputTask} objects at runtime.
- * Before passed on to the {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler}, an {@link ExecutionGraph} is registered with the input split
- * manager and all included input vertices of the graph register their generated input splits with the manager. Each
- * type of input split can be assigned to a specific {@link InputSplitAssigner} which is loaded by the input split
- * manager at runtime.
- * <p>
- * This class is thread-safe.
- */
-public final class InputSplitManager {
-
-	/**
-	 * The logging object which is used to report information and errors.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(InputSplitManager.class);
-
-	/**
-	 * The prefix of the configuration key which is used to retrieve the class names of the individual
-	 * {@link InputSplitAssigner} classes
-	 */
-	private static final String INPUT_SPLIT_CONFIG_KEY_PREFIX = "inputsplit.assigner.";
-
-	/**
-	 * A cache which stores the mapping of group vertices to assigner objects for fast retrieval during the job
-	 * execution.
-	 */
-	private final Map<ExecutionGroupVertex, InputSplitAssigner> assignerCache = new ConcurrentHashMap<ExecutionGroupVertex, InputSplitAssigner>();
-
-	/**
-	 * A map holding an instance of each available {@link InputSplitAssigner}, accessible via the class name of the
-	 * corresponding split type.
-	 */
-	private final Map<Class<? extends InputSplit>, InputSplitAssigner> loadedAssigners = new HashMap<Class<? extends InputSplit>, InputSplitAssigner>();
-
-	/**
-	 * The input split tracker makes sure that a vertex retrieves the same sequence of input splits after being
-	 * restarted.
-	 */
-	private final InputSplitTracker inputSplitTracker = new InputSplitTracker();
-
-	/**
-	 * The default input split assigner which is always used if a more specific assigner cannot be found.
-	 */
-	private final InputSplitAssigner defaultAssigner = new DefaultInputSplitAssigner();
-
-	/**
-	 * Registers a new job represented by its {@link ExecutionGraph} with the input split manager.
-	 * 
-	 * @param executionGraph
-	 *        the job to be registered
-	 */
-	public void registerJob(final ExecutionGraph executionGraph) {
-
-		final Iterator<ExecutionGroupVertex> it = new ExecutionGroupVertexIterator(executionGraph, true, -1);
-		while (it.hasNext()) {
-
-			final ExecutionGroupVertex groupVertex = it.next();
-			final InputSplit[] inputSplits = groupVertex.getInputSplits();
-
-			if (inputSplits == null) {
-				continue;
-			}
-
-			if (inputSplits.length == 0) {
-				continue;
-			}
-
-			final InputSplitAssigner assigner = getAssignerByType(groupVertex.getInputSplitType(), true);
-			// Add entry to cache for fast retrieval during the job execution
-			this.assignerCache.put(groupVertex, assigner);
-
-			assigner.registerGroupVertex(groupVertex);
-		}
-
-		// Register job with the input split tracker
-		this.inputSplitTracker.registerJob(executionGraph);
-	}
-
-	/**
-	 * Unregisters the given job represented by its {@link ExecutionGraph} with the input split manager.
-	 * 
-	 * @param executionGraph
-	 *        the job to be unregistered
-	 */
-	public void unregisterJob(final ExecutionGraph executionGraph) {
-
-		final Iterator<ExecutionGroupVertex> it = new ExecutionGroupVertexIterator(executionGraph, true, -1);
-		while (it.hasNext()) {
-
-			final ExecutionGroupVertex groupVertex = it.next();
-			final InputSplit[] inputSplits = groupVertex.getInputSplits();
-
-			if (inputSplits == null) {
-				continue;
-			}
-
-			if (inputSplits.length == 0) {
-				continue;
-			}
-
-			final InputSplitAssigner assigner = this.assignerCache.remove(groupVertex);
-			if (assigner == null) {
-				LOG.error("Group vertex " + groupVertex.getName()
-					+ " is unregistered, but cannot be found in assigner cache");
-				continue;
-			}
-
-			assigner.unregisterGroupVertex(groupVertex);
-		}
-
-		// Unregister job from input split tracker
-		this.inputSplitTracker.unregisterJob(executionGraph);
-	}
-
-	/**
-	 * Returns the next input split the input split manager (or the responsible {@link InputSplitAssigner} to be more
-	 * precise) has chosen for the given vertex to consume.
-	 * 
-	 * @param vertex
-	 *        the vertex for which the next input split is to be determined
-	 * @param sequenceNumber
-	 *        the sequence number of the vertex's request
-	 * @return the next input split to consume or <code>null</code> if the vertex shall consume no more input splits
-	 */
-	public InputSplit getNextInputSplit(final ExecutionVertex vertex, final int sequenceNumber) {
-
-		InputSplit nextInputSplit = this.inputSplitTracker.getInputSplitFromLog(vertex, sequenceNumber);
-		if (nextInputSplit != null) {
-			LOG.info("Input split " + nextInputSplit.getSplitNumber() + " for vertex " + vertex + " replayed from log");
-			return nextInputSplit;
-		}
-
-		final ExecutionGroupVertex groupVertex = vertex.getGroupVertex();
-		final InputSplitAssigner inputSplitAssigner = this.assignerCache.get(groupVertex);
-		if (inputSplitAssigner == null) {
-			final JobID jobID = groupVertex.getExecutionStage().getExecutionGraph().getJobID();
-			LOG.error("Cannot find input assigner for group vertex " + groupVertex.getName() + " (job " + jobID + ")");
-			return null;
-		}
-
-		nextInputSplit = inputSplitAssigner.getNextInputSplit(vertex);
-		if (nextInputSplit != null) {
-			this.inputSplitTracker.addInputSplitToLog(vertex, sequenceNumber, nextInputSplit);
-			LOG.info(vertex + " receives input split " + nextInputSplit.getSplitNumber());
-		}
-
-		return nextInputSplit;
-	}
-
-	/**
-	 * Returns the {@link InputSplitAssigner} which is defined for the given type of input split.
-	 * 
-	 * @param inputSplitType
-	 *        the type of input split to find the corresponding {@link InputSplitAssigner} for
-	 * @param allowLoading
-	 *        <code>true</code> to indicate that the input split assigner is allowed to load additional classes if
-	 *        necessary, <code>false</code> otherwise
-	 * @return the {@link InputSplitAssigner} responsible for the given type of input split
-	 */
-	private InputSplitAssigner getAssignerByType(final Class<? extends InputSplit> inputSplitType,
-			final boolean allowLoading) {
-
-		synchronized (this.loadedAssigners) {
-
-			InputSplitAssigner assigner = this.loadedAssigners.get(inputSplitType);
-			if (assigner == null && allowLoading) {
-				assigner = loadInputSplitAssigner(inputSplitType);
-				if (assigner != null) {
-					this.loadedAssigners.put(inputSplitType, assigner);
-				}
-			}
-
-			if (assigner != null) {
-				return assigner;
-			}
-		}
-
-		LOG.warn("Unable to find specific input split provider for type " + inputSplitType.getName()
-			+ ", using default assigner");
-
-		return this.defaultAssigner;
-	}
-
-	/**
-	 * Attempts to find the responsible type of {@link InputSplitAssigner} for the given type of input split from the
-	 * configuration and instantiate an object for it.
-	 * 
-	 * @param inputSplitType
-	 *        the type of input split to load the {@link InputSplitAssigner} for
-	 * @return the newly loaded {@link InputSplitAssigner} object or <code>null</code> if no such object could be
-	 *         located or loaded
-	 */
-	private InputSplitAssigner loadInputSplitAssigner(final Class<? extends InputSplit> inputSplitType) {
-
-		final String className = inputSplitType.getName();
-		
-		final String assignerKey = INPUT_SPLIT_CONFIG_KEY_PREFIX + className;
-		LOG.info("Trying to load input split assigner for type " + className);
-
-		String assignerClassName = GlobalConfiguration.getString(assignerKey, null);
-
-		// Provide hard-wired default configuration for FileInputSplit objects to make configuration more robust
-		if (assignerClassName == null) {
-			if (FileInputSplit.class == inputSplitType) {
-				return new FileInputSplitAssigner();
-			}
-			else if (GenericInputSplit.class == inputSplitType) {
-				return new DefaultInputSplitAssigner();
-			}
-			else {
-				return null;
-			}
-		}
-
-		try {
-			final Class<? extends InputSplitAssigner> assignerClass =
-					Class.forName(assignerClassName).asSubclass(InputSplitAssigner.class);
-			return assignerClass.newInstance();
-		}
-		catch (Exception e) {
-			LOG.error(StringUtils.stringifyException(e));
-		}
-
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
deleted file mode 100644
index 013fbec..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertexIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-
-/**
- * The input split tracker maintains a log of all the input splits that are handed out to the individual input vertices.
- * In case of an input vertex must be restarted the input split tracker makes sure that the vertex receives the same
- * sequence of input splits as in its original run up to the point that it crashed.
- * <p>
- * This class is thread-safe.
- * 
- */
-final class InputSplitTracker {
-
-	/**
-	 * The logging object which is used to report information and errors.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(InputSplitTracker.class);
-
-	/**
-	 * The central split map which stores the logs of the individual input vertices.
-	 */
-	private final ConcurrentMap<ExecutionVertexID, List<InputSplit>> splitMap = new ConcurrentHashMap<ExecutionVertexID, List<InputSplit>>();
-
-	/**
-	 * Constructor with package visibility only.
-	 */
-	InputSplitTracker() {
-	}
-
-	/**
-	 * Registers a new job with the input split tracker.
-	 * 
-	 * @param eg
-	 *        the execution graph of the job to be registered
-	 */
-	void registerJob(final ExecutionGraph eg) {
-
-		final Iterator<ExecutionGroupVertex> it = new ExecutionGroupVertexIterator(eg, true, -1);
-		while (it.hasNext()) {
-
-			final ExecutionGroupVertex groupVertex = it.next();
-			final InputSplit[] inputSplits = groupVertex.getInputSplits();
-
-			if (inputSplits == null) {
-				continue;
-			}
-
-			if (inputSplits.length == 0) {
-				continue;
-			}
-
-			for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
-				final ExecutionVertex vertex = groupVertex.getGroupMember(i);
-				if (this.splitMap.put(vertex.getID(), new ArrayList<InputSplit>()) != null) {
-					LOG.error("InputSplitTracker must keep track of two vertices with ID " + vertex.getID());
-				}
-			}
-		}
-	}
-
-	/**
-	 * Unregisters a job from the input split tracker.
-	 * 
-	 * @param eg
-	 *        the execution graph of the job to be unregistered
-	 */
-	void unregisterJob(final ExecutionGraph eg) {
-
-		final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, true);
-		while (it.hasNext()) {
-			this.splitMap.remove(it.next().getID());
-		}
-	}
-
-	/**
-	 * Returns the input split with the given sequence number from the specified vertex's log or <code>null</code> if no
-	 * such input split exists.
-	 * 
-	 * @param vertex
-	 *        the vertex for which the input split shall be returned from the log
-	 * @param sequenceNumber
-	 *        the sequence number identifying the log entry
-	 * @return the input split that was stored under the given sequence number of the vertex's log or <code>null</code>
-	 *         if no such input split exists
-	 */
-	InputSplit getInputSplitFromLog(final ExecutionVertex vertex, final int sequenceNumber) {
-
-		final List<InputSplit> inputSplitLog = this.splitMap.get(vertex.getID());
-		if (inputSplitLog == null) {
-			LOG.error("Cannot find input split log for vertex " + vertex + " (" + vertex.getID() + ")");
-			return null;
-		}
-
-		synchronized (inputSplitLog) {
-
-			if (sequenceNumber < inputSplitLog.size()) {
-				return inputSplitLog.get(sequenceNumber);
-			}
-		}
-
-		return null;
-	}
-
-	/**
-	 * Adds the given input split to the vertex's log and stores it under the specified sequence number.
-	 * 
-	 * @param vertex
-	 *        the vertex for which the input split shall be stored
-	 * @param sequenceNumber
-	 *        the sequence number identifying the log entry under which the input split shall be stored
-	 * @param inputSplit
-	 *        the input split to be stored
-	 */
-	void addInputSplitToLog(final ExecutionVertex vertex, final int sequenceNumber, final InputSplit inputSplit) {
-
-		final List<InputSplit> inputSplitLog = this.splitMap.get(vertex.getID());
-		if (inputSplitLog == null) {
-			LOG.error("Cannot find input split log for vertex " + vertex + " (" + vertex.getID() + ")");
-			return;
-		}
-
-		synchronized (inputSplitLog) {
-			if (inputSplitLog.size() != sequenceNumber) {
-				LOG.error("Expected input split with sequence number " + inputSplitLog.size() + " for vertex " + vertex
-					+ " (" + vertex.getID() + ") but received " + sequenceNumber + ", skipping...");
-				return;
-			}
-
-			inputSplitLog.add(inputSplit);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java
index 6713714..958fa4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitWrapper.java
@@ -28,12 +28,10 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.util.StringUtils;
 
 /**
  * An input split wrapper object wraps an input split for RPC calls. In particular, the input split wrapper ensures that
  * the right class loader is used to instantiate the wrapped input split object.
- * 
  */
 public final class InputSplitWrapper implements IOReadableWritable {
 
@@ -45,7 +43,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
 	/**
 	 * The wrapped input split.
 	 */
-	private InputSplit inputSplit = null;
+	private InputSplit inputSplit;
 
 	/**
 	 * Constructs a new input split wrapper.
@@ -55,10 +53,9 @@ public final class InputSplitWrapper implements IOReadableWritable {
 	 * @param inputSplit
 	 *        the input split to be wrapped
 	 */
-	public InputSplitWrapper(final JobID jobID, final InputSplit inputSplit) {
-
+	public InputSplitWrapper(JobID jobID, InputSplit inputSplit) {
 		if (jobID == null) {
-			throw new IllegalArgumentException("Argument jobID must not be null");
+			throw new NullPointerException();
 		}
 
 		this.jobID = jobID;
@@ -74,7 +71,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutputView out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 
 		// Write the job ID
 		this.jobID.write(out);
@@ -96,7 +93,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public void read(final DataInputView in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 
 		// Read the job ID
 		this.jobID.read(in);
@@ -117,15 +114,15 @@ public final class InputSplitWrapper implements IOReadableWritable {
 			try {
 				splitClass = (Class<? extends InputSplit>) Class.forName(className, true, cl);
 			} catch (ClassNotFoundException e) {
-				throw new IOException(StringUtils.stringifyException(e));
+				throw new IOException(e);
 			}
 
 			try {
 				this.inputSplit = splitClass.newInstance();
 			} catch (InstantiationException e) {
-				throw new IOException(StringUtils.stringifyException(e));
+				throw new IOException(e);
 			} catch (IllegalAccessException e) {
-				throw new IOException(StringUtils.stringifyException(e));
+				throw new IOException(e);
 			}
 
 			// Read the input split itself
@@ -142,7 +139,6 @@ public final class InputSplitWrapper implements IOReadableWritable {
 	 * @return the wrapped input split, possibly <code>null</code>
 	 */
 	public InputSplit getInputSplit() {
-
 		return this.inputSplit;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
index 6a45866..08b5b65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
@@ -16,105 +16,182 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager.splitassigner;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
 
 /**
- * The locatable input split assigner is a specific implementation of the {@link InputSplitAssigner} interface for
- * {@link LocatableInputSplit} objects. The locatable input split assigner offers to take the storage location of the
- * individual locatable input splits into account. It attempts to always assign the splits to vertices in a way that the
- * data locality is preserved as well as possible.
- * <p>
- * This class is thread-safe.
- * 
+ * The locatable input split assigner assigns to each host splits that are local, before assigning
+ * splits that are not local. 
  */
 public final class LocatableInputSplitAssigner implements InputSplitAssigner {
 
-	/**
-	 * The logging object which is used to report information and errors.
-	 */
 	private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
 
-	private final ConcurrentMap<ExecutionGroupVertex, LocatableInputSplitList> vertexMap = new ConcurrentHashMap<ExecutionGroupVertex, LocatableInputSplitList>();
-
 
-	@Override
-	public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
+	private final Set<LocatableInputSplit> unassigned = new HashSet<LocatableInputSplit>();
+	
+	private final ConcurrentHashMap<String, List<LocatableInputSplit>> localPerHost = new ConcurrentHashMap<String, List<LocatableInputSplit>>();
+	
+	private int localAssignments;		// lock protected by the unassigned set lock
+	
+	private int remoteAssignments;		// lock protected by the unassigned set lock
 
-		if (!LocatableInputSplit.class.isAssignableFrom(groupVertex.getInputSplitType())) {
-			LOG.error(groupVertex.getName() + " produces input splits of type " + groupVertex.getInputSplitType()
-				+ " and cannot be handled by this split assigner");
-			return;
-		}
+	// --------------------------------------------------------------------------------------------
+	
+	public LocatableInputSplitAssigner(Collection<LocatableInputSplit> splits) {
+		this.unassigned.addAll(splits);
+	}
+	
+	public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
+		Collections.addAll(this.unassigned, splits);
+	}
+	
+	// --------------------------------------------------------------------------------------------
 
-		// Ignore vertices that do not produce splits
-		final InputSplit[] inputSplits = groupVertex.getInputSplits();
-		if (inputSplits == null) {
-			return;
+	@Override
+	public LocatableInputSplit getNextInputSplit(String host) {
+		// for a null host, we return an arbitrary split
+		if (host == null) {
+			
+			synchronized (this.unassigned) {
+				Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
+				if (iter.hasNext()) {
+					LocatableInputSplit next = iter.next();
+					iter.remove();
+					
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Assigning arbitrary split to null host.");
+					}
+					
+					remoteAssignments++;
+					return next;
+				} else {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("No more input splits remaining.");
+					}
+					return null;
+				}
+			}
 		}
-
-		if (inputSplits.length == 0) {
-			return;
+		
+		host = host.toLowerCase(Locale.US);
+		
+		// for any non-null host, we take the list of non-null splits
+		List<LocatableInputSplit> localSplits = this.localPerHost.get(host);
+		
+		// if we have no list for this host yet, create one
+		if (localSplits == null) {
+			localSplits = new ArrayList<LocatableInputSplit>(16);
+			
+			// lock the list, to be sure that others have to wait for that host's local list
+			synchronized (localSplits) {
+				List<LocatableInputSplit> prior = this.localPerHost.putIfAbsent(host, localSplits);
+				
+				// if someone else beat us in the case to create this list, then we do not populate this one, but
+				// simply work with that other list
+				if (prior == null) {
+					// we are the first, we populate
+					
+					// first, copy the remaining splits to release the lock on the set early
+					// because that is shared among threads
+					LocatableInputSplit[] remaining;
+					synchronized (this.unassigned) {
+						remaining = (LocatableInputSplit[]) this.unassigned.toArray(new LocatableInputSplit[this.unassigned.size()]);
+					}
+					
+					for (LocatableInputSplit is : remaining) {
+						if (isLocal(host, is.getHostnames())) {
+							localSplits.add(is);
+						}
+					}
+				}
+				else {
+					// someone else was faster
+					localSplits = prior;
+				}
+			}
 		}
-
-		final LocatableInputSplitList splitStore = new LocatableInputSplitList();
-		if (this.vertexMap.putIfAbsent(groupVertex, splitStore) != null) {
-			LOG.error(groupVertex.getName()
-				+ " appears to be already registered with the locatable input split assigner, ignoring vertex...");
-			return;
+		
+		// at this point, we have a list of local splits (possibly empty)
+		// we need to make sure no one else operates in the current list (that protects against
+		// list creation races) and that the unassigned set is consistent
+		// NOTE: we need to obtain the locks in this order, strictly!!!
+		synchronized (localSplits) {
+			int size = localSplits.size();
+			if (size > 0) {
+				synchronized (this.unassigned) {
+					do {
+						--size;
+						LocatableInputSplit split = localSplits.remove(size);
+						if (this.unassigned.remove(split)) {
+							
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Assigning local split to host " + host);
+							}
+							
+							localAssignments++;
+							return split;
+						}
+					} while (size > 0);
+				}
+			}
 		}
-
-		synchronized (splitStore) {
-
-			for (int i = 0; i < inputSplits.length; ++i) {
-				// TODO: Improve this
-				final InputSplit inputSplit = inputSplits[i];
-				if (!(inputSplit instanceof LocatableInputSplit)) {
-					LOG.error("Input split " + i + " of vertex " + groupVertex.getName() + " is of type "
-						+ inputSplit.getClass() + ", ignoring split...");
-					continue;
+		
+		// we did not find a local split, return any
+		synchronized (this.unassigned) {
+			Iterator<LocatableInputSplit> iter = this.unassigned.iterator();
+			if (iter.hasNext()) {
+				LocatableInputSplit next = iter.next();
+				iter.remove();
+				
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Assigning remote split to host " + host);
 				}
-				splitStore.addSplit((LocatableInputSplit) inputSplit);
+				
+				remoteAssignments++;
+				return next;
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("No more input splits remaining.");
+				}
+				return null;
 			}
-
 		}
 	}
-
-
-	@Override
-	public void unregisterGroupVertex(final ExecutionGroupVertex groupVertex) {
-		this.vertexMap.remove(groupVertex);
-	}
-
-
-	@Override
-	public InputSplit getNextInputSplit(final ExecutionVertex vertex) {
-
-		final ExecutionGroupVertex groupVertex = vertex.getGroupVertex();
-		final LocatableInputSplitList splitStore = this.vertexMap.get(groupVertex);
-
-		if (splitStore == null) {
-			return null;
+	
+	private static final boolean isLocal(String host, String[] hosts) {
+		if (host == null || hosts == null) {
+			return false;
 		}
-
-		final Instance instance = vertex.getAllocatedResource().getInstance();
-		if (instance == null) {
-			LOG.error("Instance is null, returning random split");
-			return null;
+		
+		for (String h : hosts) {
+			if (h != null && host.equals(h.toLowerCase())) {
+				return true;
+			}
 		}
-
-		return splitStore.getNextInputSplit(instance);
+		
+		return false;
+	}
+	
+	public int getNumberOfLocalAssignments() {
+		return localAssignments;
+	}
+	
+	public int getNumberOfRemoteAssignments() {
+		return remoteAssignments;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
deleted file mode 100644
index 71fbf7a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.runtime.instance.Instance;
-
-/**
- * The locatable input split list stores the locatable input splits for an input vertex that are still expected to be
- * consumed. Besides simply storing the splits, the locatable input split list also computes the distance all
- * {@link org.apache.flink.runtime.instance.Instance} objects which request an input split and its nearest storage location with respect to the
- * underlying network topology. That way input splits are always given to consuming vertices in a way that data locality
- * is preserved as well as possible.
- * <p>
- * This class is not thread-safe.
- * 
- */
-public final class LocatableInputSplitList {
-
-	/**
-	 * The logging object which is used to report information and errors.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitList.class);
-
-	/**
-	 * The set containing all the locatable input splits that still must be consumed.
-	 */
-	private Set<LocatableInputSplit> masterSet = new HashSet<LocatableInputSplit>();
-
-	/**
-	 * The map caching the specific file input split lists for each {@link org.apache.flink.runtime.instance.Instance}.
-	 */
-	private Map<Instance, Queue<QueueElem>> instanceMap = new HashMap<Instance, Queue<QueueElem>>();
-
-	/**
-	 * This is an auxiliary class to store the minimum distance between a file input split's storage locations and an
-	 * {@link org.apache.flink.runtime.instance.Instance}.
-	 * 
-	 */
-	private final class QueueElem implements Comparable<QueueElem> {
-
-		/**
-		 * The locatable input split the distance applies to.
-		 */
-		final LocatableInputSplit inputSplit;
-
-		/**
-		 * The minimum distance between the file input split's storage locations and the instance this object has been
-		 * created for.
-		 */
-		final int distance;
-
-		/**
-		 * Creates a new queue element.
-		 * 
-		 * @param inputSplit
-		 *        the locatable input split to be stored
-		 * @param distance
-		 *        the minimum distance between the stored input split's storage locations and the instance this object
-		 *        has been created for
-		 */
-		private QueueElem(final LocatableInputSplit inputSplit, final int distance) {
-			this.inputSplit = inputSplit;
-			this.distance = distance;
-		}
-
-		/**
-		 * Returns the locatable input split stored within this object.
-		 * 
-		 * @return the locatable input split
-		 */
-		private LocatableInputSplit getInputSplit() {
-			return this.inputSplit;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public int compareTo(final QueueElem o) {
-
-			return (this.distance - o.distance);
-		}
-
-	}
-
-	/**
-	 * Adds the given locate input split to the set of locatable input splits to be consumed.
-	 * 
-	 * @param locatableInputSplit
-	 *        the locatable input split to be added
-	 */
-	synchronized void addSplit(final LocatableInputSplit locatableInputSplit) {
-
-		this.masterSet.add(locatableInputSplit);
-	}
-
-	/**
-	 * Returns the next locatable input split to be consumed by the given instance. The returned input split is selected
-	 * in a
-	 * way that the distance between the split's storage location and the requesting {@link org.apache.flink.runtime.instance.Instance} is as
-	 * short as possible.
-	 * 
-	 * @param instance
-	 *        the instance requesting the next file input split
-	 * @return the next input split to be consumed by the given instance or <code>null</code> if all input splits have
-	 *         already been consumed.
-	 */
-	synchronized LocatableInputSplit getNextInputSplit(final Instance instance) {
-
-		final Queue<QueueElem> instanceSplitList = getInstanceSplitList(instance);
-
-		while (true) {
-
-			final QueueElem candidate = instanceSplitList.poll();
-			if (candidate == null) {
-				return null;
-			}
-
-			if (this.masterSet.remove(candidate.getInputSplit())) {
-				if (LOG.isInfoEnabled()) {
-					if (candidate.distance == 0) {
-						LOG.info(instance + " receives local file input split");
-					} else {
-						LOG.info(instance + " receives remote file input split (distance " + candidate.distance + ")");
-					}
-				}
-				return candidate.getInputSplit();
-			}
-
-			if (this.masterSet.isEmpty()) {
-				return null;
-			}
-		}
-	}
-
-	/**
-	 * Returns a list of locatable input splits specifically ordered for the given {@link org.apache.flink.runtime.instance.Instance}. When the
-	 * list is initially created, it contains all the unconsumed located input splits at that point in time, ascendingly
-	 * ordered
-	 * by the minimum distance between the input splits' storage locations and the given {@link org.apache.flink.runtime.instance.Instance}.
-	 * 
-	 * @param instance
-	 *        the instance for which the locatable input split list has been computed
-	 * @return the list of file input splits ordered specifically for the given instance
-	 */
-	private Queue<QueueElem> getInstanceSplitList(final Instance instance) {
-
-		Queue<QueueElem> instanceSplitList = this.instanceMap.get(instance);
-		if (instanceSplitList == null) {
-
-			// Create and populate instance specific split list
-			instanceSplitList = new PriorityQueue<LocatableInputSplitList.QueueElem>();
-			final Iterator<LocatableInputSplit> it = this.masterSet.iterator();
-			while (it.hasNext()) {
-
-				final LocatableInputSplit split = it.next();
-				final String[] hostnames = split.getHostnames();
-				if (hostnames == null) {
-					instanceSplitList.add(new QueueElem(split, Integer.MAX_VALUE));
-
-				} else {
-
-					int minDistance = Integer.MAX_VALUE;
-					for (int i = 0; i < hostnames.length; ++i) {
-						final int distance = instance.getDistance(hostnames[i]);
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Distance between " + instance + " and " + hostnames[i] + " is " + distance);
-						}
-						if (distance < minDistance) {
-							minDistance = distance;
-						}
-					}
-
-					instanceSplitList.add(new QueueElem(split, minDistance));
-				}
-			}
-
-			this.instanceMap.put(instance, instanceSplitList);
-		}
-
-		return instanceSplitList;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
deleted file mode 100644
index 383ed38..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner.file;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitAssigner;
-
-/**
- * The file input split assigner is a specific implementation of the {@link InputSplitAssigner} interface for
- * {@link FileInputSplit} objects. The file input split assigner offers to take the storage location of the individual
- * file input splits into account. It attempts to always assign the splits to vertices in a way that the data locality
- * is preserved as well as possible.
- * <p>
- * This class is thread-safe.
- * 
- */
-public final class FileInputSplitAssigner implements InputSplitAssigner {
-
-	/**
-	 * The logging object which is used to report information and errors.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(FileInputSplitAssigner.class);
-
-	private final ConcurrentMap<ExecutionGroupVertex, FileInputSplitList> vertexMap = new ConcurrentHashMap<ExecutionGroupVertex, FileInputSplitList>();
-
-
-	@Override
-	public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
-
-		if (!FileInputSplit.class.equals(groupVertex.getInputSplitType())) {
-			LOG.error(groupVertex.getName() + " produces input splits of type " + groupVertex.getInputSplitType()
-				+ " and cannot be handled by this split assigner");
-			return;
-		}
-
-		// Ignore vertices that do not produce splits
-		final InputSplit[] inputSplits = groupVertex.getInputSplits();
-		if (inputSplits == null) {
-			return;
-		}
-
-		if (inputSplits.length == 0) {
-			return;
-		}
-
-		final FileInputSplitList splitStore = new FileInputSplitList();
-		if (this.vertexMap.putIfAbsent(groupVertex, splitStore) != null) {
-			LOG.error(groupVertex.getName()
-				+ " appears to be already registered with the file input split assigner, ignoring vertex...");
-			return;
-		}
-
-		synchronized (splitStore) {
-
-			for (int i = 0; i < inputSplits.length; ++i) {
-				// TODO: Improve this
-				final InputSplit inputSplit = inputSplits[i];
-				if (!(inputSplit instanceof FileInputSplit)) {
-					LOG.error("Input split " + i + " of vertex " + groupVertex.getName() + " is of type "
-						+ inputSplit.getClass() + ", ignoring split...");
-					continue;
-				}
-				splitStore.addSplit((FileInputSplit) inputSplit);
-			}
-
-		}
-	}
-
-	@Override
-	public void unregisterGroupVertex(final ExecutionGroupVertex groupVertex) {
-		this.vertexMap.remove(groupVertex);
-	}
-
-	@Override
-	public InputSplit getNextInputSplit(final ExecutionVertex vertex) {
-
-		final ExecutionGroupVertex groupVertex = vertex.getGroupVertex();
-		final FileInputSplitList splitStore = this.vertexMap.get(groupVertex);
-
-		if (splitStore == null) {
-			return null;
-		}
-
-		final Instance instance = vertex.getAllocatedResource().getInstance();
-		if (instance == null) {
-			LOG.error("Instance is null, returning random split");
-			return null;
-		}
-
-		return splitStore.getNextInputSplit(instance);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
deleted file mode 100644
index 06cca24..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.splitassigner.file;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.runtime.instance.Instance;
-
-/**
- * The file input split list stores the file input splits for an input vertex that are still expected to be consumed.
- * Besides simply storing the splits, the file input split list also computes the distance all {@link org.apache.flink.runtime.instance.Instance}
- * objects which request a input split and its nearest storage location with respect to the underlying network topology.
- * That way input splits are always given to consuming vertices in a way that data locality is preserved as well as
- * possible.
- * <p>
- * This class is not thread-safe.
- * 
- */
-public final class FileInputSplitList {
-
-	/**
-	 * The logging object which is used to report information and errors.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(FileInputSplitList.class);
-
-	/**
-	 * The set containing all the file input splits that still must be consumed.
-	 */
-	private Set<FileInputSplit> masterSet = new HashSet<FileInputSplit>();
-
-	/**
-	 * The map caching the specific file input split lists for each {@link org.apache.flink.runtime.instance.Instance}.
-	 */
-	private Map<Instance, Queue<QueueElem>> instanceMap = new HashMap<Instance, Queue<QueueElem>>();
-
-	/**
-	 * This is an auxiliary class to store the minimum distance between a file input split's storage locations and an
-	 * {@link org.apache.flink.runtime.instance.Instance}.
-	 * 
-	 */
-	private final class QueueElem implements Comparable<QueueElem> {
-
-		/**
-		 * The file input split the distance applies to.
-		 */
-		final FileInputSplit inputSplit;
-
-		/**
-		 * The minimum distance between the file input split's storage locations and the instance this object has been
-		 * created for.
-		 */
-		final int distance;
-
-		/**
-		 * Creates a new queue element.
-		 * 
-		 * @param inputSplit
-		 *        the file input split to be stored
-		 * @param distance
-		 *        the minimum distance between the stored input split's storage locations and the instance this object
-		 *        has been created for
-		 */
-		private QueueElem(final FileInputSplit inputSplit, final int distance) {
-			this.inputSplit = inputSplit;
-			this.distance = distance;
-		}
-
-		/**
-		 * Returns the file input split stored within this object.
-		 * 
-		 * @return the file input split
-		 */
-		private FileInputSplit getInputSplit() {
-			return this.inputSplit;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public int compareTo(final QueueElem o) {
-
-			return (this.distance - o.distance);
-		}
-
-	}
-
-	/**
-	 * Adds the given file input split to the set of file input splits to be consumed.
-	 * 
-	 * @param fileInputSplit
-	 *        the file input split to be added
-	 */
-	synchronized void addSplit(final FileInputSplit fileInputSplit) {
-
-		this.masterSet.add(fileInputSplit);
-	}
-
-	/**
-	 * Returns the next file input split to be consumed by the given instance. The returned input split is selected in a
-	 * way that the distance between the split's storage location and the requesting {@link org.apache.flink.runtime.instance.Instance} is as
-	 * short as possible.
-	 * 
-	 * @param instance
-	 *        the instance requesting the next file input split
-	 * @return the next input split to be consumed by the given instance or <code>null</code> if all input splits have
-	 *         already been consumed.
-	 */
-	synchronized FileInputSplit getNextInputSplit(final Instance instance) {
-
-		final Queue<QueueElem> instanceSplitList = getInstanceSplitList(instance);
-
-		while (true) {
-
-			final QueueElem candidate = instanceSplitList.poll();
-			if (candidate == null) {
-				return null;
-			}
-
-			if (this.masterSet.remove(candidate.getInputSplit())) {
-				if (LOG.isInfoEnabled()) {
-					if (candidate.distance == 0) {
-						LOG.info(instance + " receives local file input split");
-					} else {
-						LOG.info(instance + " receives remote file input split (distance " + candidate.distance + ")");
-					}
-				}
-				return candidate.getInputSplit();
-			}
-
-			if (this.masterSet.isEmpty()) {
-				return null;
-			}
-		}
-	}
-
-	/**
-	 * Returns a list of file input splits specifically ordered for the given {@link org.apache.flink.runtime.instance.Instance}. When the list is
-	 * initially created, it contains all the unconsumed file input splits at that point in time, ascendingly ordered by
-	 * the minimum distance between the input splits' storage locations and the given {@link org.apache.flink.runtime.instance.Instance}.
-	 * 
-	 * @param instance
-	 *        the instance for which the file input split list has been computed
-	 * @return the list of file input splits ordered specifically for the given instance
-	 */
-	private Queue<QueueElem> getInstanceSplitList(final Instance instance) {
-
-		Queue<QueueElem> instanceSplitList = this.instanceMap.get(instance);
-		if (instanceSplitList == null) {
-
-			// Create and populate instance specific split list
-			instanceSplitList = new PriorityQueue<FileInputSplitList.QueueElem>();
-			final Iterator<FileInputSplit> it = this.masterSet.iterator();
-			while (it.hasNext()) {
-
-				final FileInputSplit split = it.next();
-				final String[] hostNames = split.getHostNames();
-				if (hostNames == null) {
-					instanceSplitList.add(new QueueElem(split, Integer.MAX_VALUE));
-
-				} else {
-
-					int minDistance = Integer.MAX_VALUE;
-					for (int i = 0; i < hostNames.length; ++i) {
-						final int distance = instance.getDistance(hostNames[i]);
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Distance between " + instance + " and " + hostNames[i] + " is " + distance);
-						}
-						if (distance < minDistance) {
-							minDistance = distance;
-						}
-					}
-
-					instanceSplitList.add(new QueueElem(split, minDistance));
-				}
-			}
-
-			this.instanceMap.put(instance, instanceSplitList);
-		}
-
-		return instanceSplitList;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
index 82e7213..0bec1f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
@@ -28,12 +28,8 @@ import static org.junit.Assert.fail;
 import java.net.InetAddress;
 
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.LogUtils;
-
-import org.apache.log4j.Level;
 
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
index 9380e26..290326c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
@@ -57,10 +57,9 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
 import org.apache.flink.runtime.types.IntegerRecord;
-import org.apache.flink.util.LogUtils;
+
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -71,12 +70,6 @@ public class LocalInstanceManagerTest {
 	
 	private int port;
 	
-	
-	@BeforeClass
-	public static void initLogger() {
-		LogUtils.initializeDefaultTestConsoleLogger();
-	}
-	
 	@Before
 	public void startJobManagerServer() {
 		try {