You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:53 UTC

[29/63] [abbrv] Redesign Scheduler from pre-assignment to more flexible schedule-on-demand model

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index a479bd3..2d30cf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.taskmanager;
 
 import java.io.File;
@@ -55,8 +54,8 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.configuration.ConfigConstants;
@@ -75,6 +74,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.ChannelManager;
 import org.apache.flink.runtime.io.network.InsufficientResourcesException;
@@ -95,7 +95,6 @@ import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
 import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
 import org.apache.flink.runtime.protocols.JobManagerProtocol;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
-import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
 import org.apache.flink.runtime.types.IntegerRecord;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.SerializableArrayList;
@@ -110,12 +109,19 @@ import org.apache.flink.util.StringUtils;
  */
 public class TaskManager implements TaskOperationProtocol {
 
-	private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
+	private static final Log LOG = LogFactory.getLog(TaskManager.class);
 
-	private final static int FAILURE_RETURN_CODE = -1;
+	private static final int STARTUP_FAILURE_RETURN_CODE = 1;
+	
+	private static final int CRITICAL_ERROR_RETURN_CODE = 2;
 	
 	private static final int IPC_HANDLER_COUNT = 1;
 	
+	private static final int MAX_LOST_HEART_BEATS = 3;
+	
+	private static final int DELAY_AFTER_LOST_CONNECTION = 10000;
+	
+	
 	public final static String ARG_CONF_DIR = "tempDir";
 	
 	// --------------------------------------------------------------------------------------------
@@ -126,7 +132,6 @@ public class TaskManager implements TaskOperationProtocol {
 	
 	private final ExecutionMode executionMode;
 	
-	
 	private final JobManagerProtocol jobManager;
 
 	private final InputSplitProviderProtocol globalInputSplitProvider;
@@ -169,9 +174,12 @@ public class TaskManager implements TaskOperationProtocol {
 	
 	private final AtomicBoolean shutdownStarted = new AtomicBoolean(false);
 	
+	private volatile InstanceID registeredId;
+	
 	/** Stores whether the task manager has already been shut down. */
 	private volatile boolean shutdownComplete;
 	
+	
 	/**
 	 * All parameters are obtained from the 
 	 * {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
@@ -203,7 +211,7 @@ public class TaskManager implements TaskOperationProtocol {
 				jobManagerAddress = new InetSocketAddress(tmpAddress, port);
 			}
 			catch (UnknownHostException e) {
-				LOG.error("Could not resolve JobManager host name.");
+				LOG.fatal("Could not resolve JobManager host name.");
 				throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
 			}
 			
@@ -214,7 +222,7 @@ public class TaskManager implements TaskOperationProtocol {
 		try {
 			this.jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
 		} catch (IOException e) {
-			LOG.error("Could not connect to the JobManager: " + e.getMessage(), e);
+			LOG.fatal("Could not connect to the JobManager: " + e.getMessage(), e);
 			throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
 		}
 		
@@ -245,7 +253,7 @@ public class TaskManager implements TaskOperationProtocol {
 				this.taskManagerServer = RPC.getServer(this, taskManagerAddress.getHostAddress(), ipcPort, IPC_HANDLER_COUNT);
 				this.taskManagerServer.start();
 			} catch (IOException e) {
-				LOG.error("Failed to start TaskManager server. " + e.getMessage(), e);
+				LOG.fatal("Failed to start TaskManager server. " + e.getMessage(), e);
 				throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
 			}
 		}
@@ -254,7 +262,7 @@ public class TaskManager implements TaskOperationProtocol {
 		try {
 			this.globalInputSplitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
 		} catch (IOException e) {
-			LOG.error(e.getMessage(), e);
+			LOG.fatal(e.getMessage(), e);
 			throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e);
 		}
 
@@ -262,7 +270,7 @@ public class TaskManager implements TaskOperationProtocol {
 		try {
 			this.lookupService = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
 		} catch (IOException e) {
-			LOG.error(e.getMessage(), e);
+			LOG.fatal(e.getMessage(), e);
 			throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
 		}
 
@@ -270,7 +278,7 @@ public class TaskManager implements TaskOperationProtocol {
 		try {
 			this.accumulatorProtocolProxy = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
 		} catch (IOException e) {
-			LOG.error("Failed to initialize accumulator protocol: " + e.getMessage(), e);
+			LOG.fatal("Failed to initialize accumulator protocol: " + e.getMessage(), e);
 			throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
 		}
 
@@ -324,17 +332,13 @@ public class TaskManager implements TaskOperationProtocol {
 							ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
 							ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);
 
-					int lowWaterMark = GlobalConfiguration.getInteger(
-							ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
-							ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);
-
-					int highWaterMark = GlobalConfiguration.getInteger(
-							ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
-							ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);
+					int closeAfterIdleForMs = GlobalConfiguration.getInteger(
+							ConfigConstants.TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS_KEY,
+							ConfigConstants.DEFAULT_TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS);
 
 					networkConnectionManager = new NettyConnectionManager(
 							localInstanceConnectionInfo.address(), localInstanceConnectionInfo.dataPort(),
-							bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
+							bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs);
 					break;
 			}
 
@@ -391,7 +395,7 @@ public class TaskManager implements TaskOperationProtocol {
 				
 				this.memoryManager = new DefaultMemoryManager(memorySize, this.numberOfSlots, pageSize);
 			} catch (Throwable t) {
-				LOG.error("Unable to initialize memory manager with " + (memorySize >>> 20) + " megabytes of memory.", t);
+				LOG.fatal("Unable to initialize memory manager with " + (memorySize >>> 20) + " megabytes of memory.", t);
 				throw new Exception("Unable to initialize memory manager.", t);
 			}
 		}
@@ -400,15 +404,21 @@ public class TaskManager implements TaskOperationProtocol {
 
 		this.ioManager = new IOManager(tmpDirPaths);
 		
-		this.heartbeatThread = new Thread() {
-			@Override
-			public void run() {
-				runHeartbeatLoop();
-			}
-		};
-
-		this.heartbeatThread.setName("Heartbeat Thread");
-		this.heartbeatThread.start();
+		// start the heart beats
+		{
+			final long interval = GlobalConfiguration.getInteger(
+					ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY,
+					ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL);
+			
+			this.heartbeatThread = new Thread() {
+				@Override
+				public void run() {
+					registerAndRunHeartbeatLoop(interval, MAX_LOST_HEART_BEATS);
+				}
+			};
+			this.heartbeatThread.setName("Heartbeat Thread");
+			this.heartbeatThread.start();
+		}
 
 		// --------------------------------------------------------------------
 		// Memory Usage
@@ -500,7 +510,7 @@ public class TaskManager implements TaskOperationProtocol {
 			line = parser.parse(options, args);
 		} catch (ParseException e) {
 			System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
-			System.exit(FAILURE_RETURN_CODE);
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
 		}
 
 		String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
@@ -524,8 +534,8 @@ public class TaskManager implements TaskOperationProtocol {
 		try {
 			new TaskManager(ExecutionMode.CLUSTER);
 		} catch (Exception e) {
-			LOG.error("Taskmanager startup failed: " + e.getMessage(), e);
-			System.exit(FAILURE_RETURN_CODE);
+			LOG.fatal("Taskmanager startup failed: " + e.getMessage(), e);
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
 		}
 		
 		// park the main thread to keep the JVM alive (all other threads may be daemon threads)
@@ -537,63 +547,7 @@ public class TaskManager implements TaskOperationProtocol {
 		}
 	}
 
-	/**
-	 * This method send the periodic heartbeats.
-	 */
-	private void runHeartbeatLoop() {
-		final long interval = GlobalConfiguration.getInteger(
-						ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY,
-						ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL);
-
-		try {
-			while(!shutdownStarted.get()){
-				RegisterTaskManagerResult result  = this.jobManager.registerTaskManager(this
-								.localInstanceConnectionInfo,this.hardwareDescription,
-						new IntegerRecord(this.numberOfSlots));
-
-				if(result.getReturnCode() == RegisterTaskManagerResult.ReturnCode.SUCCESS){
-					break;
-				}
-
-				try{
-					Thread.sleep(50);
-				}catch(InterruptedException e){
-					if (!shutdownStarted.get()) {
-						LOG.error("TaskManager register task manager loop was interrupted without shutdown.");
-					}
-				}
-			}
-
-		} catch (IOException e) {
-			if(!shutdownStarted.get()){
-				LOG.error("Registering task manager caused an exception: " + e.getMessage(), e);
-			}
-			return;
-		}
 
-		while (!shutdownStarted.get()) {
-			// sleep until the next heart beat
-			try {
-				Thread.sleep(interval);
-			}
-			catch (InterruptedException e) {
-				if (!shutdownStarted.get()) {
-					LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
-				}
-			}
-
-			// send heart beat
-			try {
-				this.jobManager.sendHeartbeat(this.localInstanceConnectionInfo);
-			} catch (IOException e) {
-				if (shutdownStarted.get()) {
-					break;
-				} else {
-					LOG.error("Sending the heart beat caused an exception: " + e.getMessage(), e);
-				}
-			}
-		}
-	}
 
 	
 	/**
@@ -979,7 +933,7 @@ public class TaskManager implements TaskOperationProtocol {
 				this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, id, newExecutionState,
 					optionalDescription));
 			} catch (IOException e) {
-				LOG.error("Could not update task execution state.", e);
+				LOG.error(e);
 			}
 		}
 	}
@@ -1046,7 +1000,7 @@ public class TaskManager implements TaskOperationProtocol {
 				this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
 			} catch (InterruptedException e) {
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Got interrupted while awaiting the termination of the executor service.", e);
+					LOG.debug(e);
 				}
 			}
 		}
@@ -1065,7 +1019,6 @@ public class TaskManager implements TaskOperationProtocol {
 
 	@Override
 	public void logBufferUtilization() {
-
 		this.channelManager.logBufferUtilization();
 	}
 
@@ -1090,38 +1043,8 @@ public class TaskManager implements TaskOperationProtocol {
 		this.channelManager.invalidateLookupCacheEntries(channelIDs);
 	}
 
-	/**
-	 * Checks, whether the given strings describe existing directories that are writable. If that is not
-	 * the case, an exception is raised.
-	 * 
-	 * @param tempDirs
-	 *        An array of strings which are checked to be paths to writable directories.
-	 * @throws Exception
-	 *         Thrown, if any of the mentioned checks fails.
-	 */
-	private static final void checkTempDirs(final String[] tempDirs) throws Exception {
-
-		for (int i = 0; i < tempDirs.length; ++i) {
-
-			final String dir = tempDirs[i];
-			if (dir == null) {
-				throw new Exception("Temporary file directory #" + (i + 1) + " is null.");
-			}
-
-			final File f = new File(dir);
-
-			if (!f.exists()) {
-				throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' does not exist.");
-			}
-
-			if (!f.isDirectory()) {
-				throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not a directory.");
-			}
-
-			if (!f.canWrite()) {
-				throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not writable.");
-			}
-		}
+	public void cancelAndClearEverything() {
+		LOG.info("Cancelling all computations and discarding all cached data.");
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -1136,8 +1059,152 @@ public class TaskManager implements TaskOperationProtocol {
 		return this.executionMode;
 	}
 	
+	/**
+	 * Gets the ID under which the TaskManager is currently registered at its JobManager.
+	 * If the TaskManager has not been registered, yet, or if it lost contact, this is is null.
+	 * 
+	 * @return The ID under which the TaskManager is currently registered.
+	 */
+	public InstanceID getRegisteredId() {
+		return this.registeredId;
+	}
+	
+	/**
+	 * Checks if the TaskManager is properly registered and ready to receive work.
+	 * 
+	 * @return True, if the TaskManager is registered, false otherwise.
+	 */
+	public boolean isRegistered() {
+		return this.registeredId != null;
+	}
+	
 	// --------------------------------------------------------------------------------------------
-	//  Memory and Garbace Collection Debugging Utilities
+	//  Heartbeats
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * This method registers the TaskManager at the jobManager and send periodic heartbeats.
+	 */
+	private void registerAndRunHeartbeatLoop(long interval, int maxNonSuccessfulHeatbeats) {
+
+		while (!shutdownStarted.get()) {
+			InstanceID resultId = null;
+	
+			// try to register. We try as long as we need to, because it may be that the jobmanager is  not yet online
+			{
+				final long maxDelay = 10000;	// the maximal delay between registration attempts
+				final long reportingDelay = 5000;
+				long currentDelay = 100;		// initially, wait 100 msecs for the next registration attempt
+				
+				while (!shutdownStarted.get())
+				{
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Trying to register at Jobmanager...");
+					}
+					
+					try {
+						resultId = this.jobManager.registerTaskManager(this.localInstanceConnectionInfo,
+								this.hardwareDescription, this.numberOfSlots);
+					}
+					catch (IOException e) {
+						// this may be if the job manager was not yet online
+						// if this has happened for a while, report it. if it has just happened
+						// at the very beginning, this may not mean anything (JM still in startup)
+						if (currentDelay >= reportingDelay) {
+							LOG.error("Connection to JobManager failed.", e);
+						} else if (LOG.isDebugEnabled()) {
+							LOG.debug("Could not connect to JobManager.", e);
+						}
+					}
+					
+					// check if we were accepted
+					if (resultId != null) {
+						// success
+						this.registeredId = resultId;
+						break;
+					} else {
+						// this is bad. The job manager refused us. report and try again later
+						LOG.error("Registration attempt refused by JobManager.");
+					}
+		
+					try {
+						Thread.sleep(currentDelay);
+					}
+					catch (InterruptedException e) {
+						// may be due to shutdown
+						if (!shutdownStarted.get()) {
+							LOG.error("TaskManager's registration loop was interrupted without shutdown.");
+						}
+					}
+					
+					// increase the time between registration attempts, to not keep on pinging overly frequently
+					currentDelay = Math.min(2 * currentDelay, maxDelay);
+				}
+			}
+			
+			// registration complete, or shutdown
+			int successiveUnsuccessfulHeartbeats = 0;
+			
+			// the heart beat loop
+			while (!shutdownStarted.get()) {
+				// sleep until the next heart beat
+				try {
+					Thread.sleep(interval);
+				}
+				catch (InterruptedException e) {
+					if (!shutdownStarted.get()) {
+						LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
+					}
+				}
+	
+				// send heart beat
+				try {
+					boolean accepted = this.jobManager.sendHeartbeat(resultId);
+					
+					if (accepted) {
+						// reset the unsuccessful heart beats
+						successiveUnsuccessfulHeartbeats = 0;
+					} else {
+						successiveUnsuccessfulHeartbeats++;
+						LOG.error("JobManager rejected heart beat.");
+					}
+				}
+				catch (IOException e) {
+					if (!shutdownStarted.get()) {
+						successiveUnsuccessfulHeartbeats++;
+						LOG.error("Sending the heart beat failed on I/O error: " + e.getMessage(), e);
+					}
+				}
+				
+				if (successiveUnsuccessfulHeartbeats == maxNonSuccessfulHeatbeats) {
+					// we are done for, we cannot connect to the jobmanager any more
+					// or we are not welcome there any more
+					// what to do now? Wait for a while and try to reconnect
+					LOG.error("TaskManager has lost connection to JobManager.");
+					
+					// mark us as disconnected and abort all computation
+					this.registeredId = null;
+					cancelAndClearEverything();
+					
+					// wait for a while, then attempt to register again
+					try {
+						Thread.sleep(DELAY_AFTER_LOST_CONNECTION);
+					}
+					catch (InterruptedException e) {
+						if (!shutdownStarted.get()) {
+							LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
+						}
+					}
+					
+					// leave the heart beat loop
+					break;
+				}
+			} // end heart beat loop
+		} // end while not shutdown
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Memory and Garbage Collection Debugging Utilities
 	// --------------------------------------------------------------------------------------------
 
 	private String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) {
@@ -1175,4 +1242,55 @@ public class TaskManager implements TaskOperationProtocol {
 
 		return str.toString();
 	}
+	
+	
+	// --------------------------------------------------------------------------------------------
+	// Miscellaneous Utilities
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Checks, whether the given strings describe existing directories that are writable. If that is not
+	 * the case, an exception is raised.
+	 * 
+	 * @param tempDirs An array of strings which are checked to be paths to writable directories.
+	 * @throws Exception Thrown, if any of the mentioned checks fails.
+	 */
+	private static final void checkTempDirs(final String[] tempDirs) throws Exception {
+		for (int i = 0; i < tempDirs.length; ++i) {
+			final String dir = tempDirs[i];
+			if (dir == null) {
+				throw new Exception("Temporary file directory #" + (i + 1) + " is null.");
+			}
+
+			final File f = new File(dir);
+
+			if (!f.exists()) {
+				throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' does not exist.");
+			}
+
+			if (!f.isDirectory()) {
+				throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not a directory.");
+			}
+
+			if (!f.canWrite()) {
+				throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not writable.");
+			}
+		}
+	}
+	
+	public static class EmergencyShutdownExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+		private final TaskManager tm;
+		
+		public EmergencyShutdownExceptionHandler(TaskManager tm) {
+			this.tm = tm;
+		}
+		
+		@Override
+		public void uncaughtException(Thread t, Throwable e) {
+			LOG.fatal("Thread " + t.getName() + " caused an unrecoverable exception.", e);
+			tm.shutdown();
+		}
+		
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/transferenvelope/RegisterTaskManagerResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/transferenvelope/RegisterTaskManagerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/transferenvelope/RegisterTaskManagerResult.java
deleted file mode 100644
index 27fae1c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/transferenvelope/RegisterTaskManagerResult.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.taskmanager.transferenvelope;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.util.EnumUtils;
-
-public class RegisterTaskManagerResult implements IOReadableWritable {
-	public enum ReturnCode{
-		SUCCESS, FAILURE
-	};
-
-	public RegisterTaskManagerResult(){
-		this.returnCode = ReturnCode.SUCCESS;
-	}
-
-	public RegisterTaskManagerResult(ReturnCode returnCode){
-		this.returnCode = returnCode;
-	}
-
-	private ReturnCode returnCode;
-
-	public ReturnCode getReturnCode() { return this.returnCode; }
-
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		EnumUtils.writeEnum(out, this.returnCode);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.returnCode = EnumUtils.readEnum(in, ReturnCode.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
index 290326c..02c814c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
 import org.apache.flink.runtime.protocols.JobManagerProtocol;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
 import org.apache.flink.runtime.types.IntegerRecord;
 
 import org.junit.After;
@@ -214,16 +213,6 @@ public class LocalInstanceManagerTest {
 		}
 
 		@Override
-		public void sendHeartbeat(InstanceConnectionInfo instanceConnectionInfo) {}
-
-		@Override
-		public RegisterTaskManagerResult registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
-				HardwareDescription hardwareDescription, IntegerRecord numberOfSlots)
-		{
-			return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.SUCCESS);
-		}
-
-		@Override
 		public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java
index 76616b8..3a24f97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java
@@ -16,185 +16,165 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
-import java.io.IOException;
-import java.util.List;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.GraphConversionException;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.SchedulingException;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
 
 /**
  * This class checks the functionality of the {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler} class
  */
-@SuppressWarnings("serial")
 public class DefaultSchedulerTest {
 
-
-	public static final class InputTask extends AbstractInvokable {
-
-		@Override
-		public void registerInputOutput() {
-			new RecordWriter<StringRecord>(this);
-		}
-
-		@Override
-		public void invoke() throws Exception {}
-
-	}
-
-	public static final class OutputTask extends AbstractInvokable {
-
-		@Override
-		public void registerInputOutput() {
-			new RecordReader<StringRecord>(this, StringRecord.class);
-		}
-
-		@Override
-		public void invoke() throws Exception {}
-
-	}
-
-	public static final class DummyInputFormat extends GenericInputFormat<IntValue> {
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return true;
-		}
-
-		@Override
-		public IntValue nextRecord(IntValue reuse) throws IOException {
-			return null;
-		}
-	}
-
-	public static final class DummyOutputFormat implements OutputFormat<IntValue> {
-
-		@Override
-		public void configure(Configuration parameters) {}
-
-		@Override
-		public void open(int taskNumber, int numTasks) {}
-
-		@Override
-		public void writeRecord(IntValue record) {}
-
-		@Override
-		public void close() {}
-	}
-
-	/**
-	 * Constructs a sample execution graph consisting of two vertices connected by a channel of the given type.
-	 * 
-	 * @param channelType
-	 *        the channel type to connect the vertices with
-	 * @return a sample execution graph
-	 */
-	private ExecutionGraph createExecutionGraph(ChannelType channelType) {
-
-		final JobGraph jobGraph = new JobGraph("Job Graph");
-
-		final JobInputVertex inputVertex = new JobInputVertex("Input 1", jobGraph);
-		inputVertex.setInvokableClass(InputTask.class);
-		inputVertex.setInputFormat(new DummyInputFormat());
-		inputVertex.setNumberOfSubtasks(1);
-
-		final JobOutputVertex outputVertex = new JobOutputVertex("Output 1", jobGraph);
-		outputVertex.setInvokableClass(OutputTask.class);
-		outputVertex.setOutputFormat(new DummyOutputFormat());
-		outputVertex.setNumberOfSubtasks(1);
-
+	private int portNum = 10000;
+	
+	@Test
+	public void testAddAndRemoveInstance() {
 		try {
-			inputVertex.connectTo(outputVertex, channelType);
-		} catch (JobGraphDefinitionException e) {
-			fail(StringUtils.stringifyException(e));
+			DefaultScheduler scheduler = new DefaultScheduler();
+			
+			Instance i1 = getRandomInstance(2);
+			Instance i2 = getRandomInstance(2);
+			Instance i3 = getRandomInstance(2);
+			
+			assertEquals(0, scheduler.getNumberOfAvailableInstances());
+			scheduler.newInstanceAvailable(i1);
+			assertEquals(1, scheduler.getNumberOfAvailableInstances());
+			scheduler.newInstanceAvailable(i2);
+			assertEquals(2, scheduler.getNumberOfAvailableInstances());
+			scheduler.newInstanceAvailable(i3);
+			assertEquals(3, scheduler.getNumberOfAvailableInstances());
+			
+			// cannot add available instance again
+			try {
+				scheduler.newInstanceAvailable(i2);
+				fail("Scheduler accepted instance twice");
+			}
+			catch (IllegalArgumentException e) {
+				// bueno!
+			}
+			
+			// some instances die
+			assertEquals(3, scheduler.getNumberOfAvailableInstances());
+			scheduler.instanceDied(i2);
+			assertEquals(2, scheduler.getNumberOfAvailableInstances());
+			
+			// try to add a dead instance
+			try {
+				scheduler.newInstanceAvailable(i2);
+				fail("Scheduler accepted dead instance");
+			}
+			catch (IllegalArgumentException e) {
+				// stimmt
+				
+			}
+						
+			scheduler.instanceDied(i1);
+			assertEquals(1, scheduler.getNumberOfAvailableInstances());
+			scheduler.instanceDied(i3);
+			assertEquals(0, scheduler.getNumberOfAvailableInstances());
+			
+			assertFalse(i1.isAlive());
+			assertFalse(i2.isAlive());
+			assertFalse(i3.isAlive());
 		}
-
-		try {
-			LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-			return new ExecutionGraph(jobGraph, 1);
-
-		} catch (GraphConversionException e) {
-			fail(StringUtils.stringifyException(e));
-		} catch (IOException e) {
-			fail(StringUtils.stringifyException(e));
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		}
-
-		return null;
 	}
-
-	/**
-	 * Checks the behavior of the scheduleJob() method with a job consisting of two tasks connected via an in-memory
-	 * channel.
-	 */
+	
+	
 	@Test
-	public void testScheduleJobWithInMemoryChannel() {
-
-		final TestInstanceManager tim = new TestInstanceManager();
-		final TestDeploymentManager tdm = new TestDeploymentManager();
-		final DefaultScheduler scheduler = new DefaultScheduler(tdm, tim);
-
-		final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.IN_MEMORY);
-
+	public void testAssignToSlots() {
 		try {
+			final JobID jobId = new JobID();
+			
+			DefaultScheduler scheduler = new DefaultScheduler();
+
+			scheduler.newInstanceAvailable(getRandomInstance(2));
+			scheduler.newInstanceAvailable(getRandomInstance(2));
+			scheduler.newInstanceAvailable(getRandomInstance(2));
+			
+			ResourceId id1 = new ResourceId();
+			ResourceId id2 = new ResourceId();
+			ResourceId id3 = new ResourceId();
+			ResourceId id4 = new ResourceId();
+			ResourceId id5 = new ResourceId();
+			ResourceId id6 = new ResourceId();
+			
+			AllocatedSlot s1 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id1), true);
+			AllocatedSlot s2 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id2), true);
+			AllocatedSlot s3 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id3), true);
+			AllocatedSlot s4 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id4), true);
+			AllocatedSlot s5 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id5), true);
+			AllocatedSlot s6 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id6), true);
+			
+			// no more slots available, the next call should throw an exception
 			try {
-				scheduler.scheduleJob(executionGraph);
-			} catch (SchedulingException e) {
-				fail(StringUtils.stringifyException(e));
-			}
-
-			// Wait for the deployment to complete
-			tdm.waitForDeployment();
-
-			assertEquals(executionGraph.getJobID(), tdm.getIDOfLastDeployedJob());
-			final List<ExecutionVertex> listOfDeployedVertices = tdm.getListOfLastDeployedVertices();
-			assertNotNull(listOfDeployedVertices);
-			// Vertices connected via in-memory channels must be deployed in a single cycle.
-			assertEquals(2, listOfDeployedVertices.size());
-
-			// Check if the release of the allocated resources works properly by simulating the vertices' life cycle
-			assertEquals(0, tim.getNumberOfReleaseMethodCalls());
-
-			// Simulate vertex life cycle
-			for (final ExecutionVertex vertex : listOfDeployedVertices) {
-				vertex.updateExecutionState(ExecutionState.STARTING);
-				vertex.updateExecutionState(ExecutionState.RUNNING);
-				vertex.updateExecutionState(ExecutionState.FINISHING);
-				vertex.updateExecutionState(ExecutionState.FINISHED);
+				scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), new ResourceId()), true);
+				fail("Scheduler accepted scheduling request without available resource.");
 			}
-
-			assertEquals(1, tim.getNumberOfReleaseMethodCalls());
-		} finally {
-			try {
-				LibraryCacheManager.unregister(executionGraph.getJobID());
-			} catch (IOException ioe) {
-				// Ignore exception here
+			catch (NoResourceAvailableException e) {
+				// expected
 			}
+			
+			// schedule something into the same slots as before
+			AllocatedSlot s1s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id1), true);
+			AllocatedSlot s2s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id2), true);
+			AllocatedSlot s3s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id3), true);
+			AllocatedSlot s4s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id4), true);
+			AllocatedSlot s5s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id5), true);
+			AllocatedSlot s6s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id6), true);
+			
+			assertEquals(s1, s1s);
+			assertEquals(s2, s2s);
+			assertEquals(s3, s3s);
+			assertEquals(s4, s4s);
+			assertEquals(s5, s5s);
+			assertEquals(s6, s6s);
 		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private Instance getRandomInstance(int numSlots) {
+		InetAddress address;
+		try {
+			address = InetAddress.getByName("127.0.0.1");
+		} catch (UnknownHostException e) {
+			throw new RuntimeException("Test could not create IP address for localhost loopback.");
+		}
+		
+		int ipcPort = portNum++;
+		int dataPort = portNum++;
+		
+		InstanceConnectionInfo ci = new InstanceConnectionInfo(address, ipcPort, dataPort);
+		
+		final long GB = 1024L*1024*1024;
+		HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
+		
+		return new Instance(ci, new InstanceID(), resources, numSlots);
+	}
+	
+	private ExecutionVertex2 getDummyVertex() {
+		return new ExecutionVertex2();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueueTest.java
new file mode 100644
index 0000000..4a2a0cb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueueTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+/**
+ * Test for the basic functionality of the {@link LifoSetQueue}.
+ */
+public class LifoSetQueueTest {
+
+	@Test
+	public void testSizeAddPollAndPeek() {
+		try {
+			LifoSetQueue<Integer> queue = new LifoSetQueue<Integer>();
+			
+			// empty queue
+			assertEquals(0, queue.size());
+			assertNull(queue.poll());
+			assertNull(queue.peek());
+			
+			// add some elements
+			assertTrue(queue.add(1));
+			assertTrue(queue.offer(2));
+			assertTrue(queue.offer(3));
+			assertEquals(3, queue.size());
+			
+			assertEquals(3, queue.peek().intValue());
+			
+			// prevent duplicates. note that the methods return true, because no capacity constraint is violated
+			assertTrue(queue.add(1));
+			assertTrue(queue.offer(1));
+			assertTrue(queue.add(3));
+			assertTrue(queue.offer(3));
+			assertTrue(queue.add(2));
+			assertTrue(queue.offer(2));
+			assertEquals(3, queue.size());
+			
+			// peek and poll some elements
+			assertEquals(3, queue.peek().intValue());
+			assertEquals(3, queue.size());
+			assertEquals(3, queue.poll().intValue());
+			assertEquals(2, queue.size());
+			assertEquals(2, queue.peek().intValue());
+			assertEquals(2, queue.size());
+			assertEquals(2, queue.poll().intValue());
+			assertEquals(1, queue.size());
+			assertEquals(1, queue.peek().intValue());
+			assertEquals(1, queue.size());
+			assertEquals(1, queue.poll().intValue());
+			assertEquals(0, queue.size());
+			assertTrue(queue.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " : " + e.getMessage());
+		}
+	}
+	
+	/**
+	 * Remove is tricky, because it goes through the iterator and calls remove() on the iterator.
+	 */
+	@Test
+	public void testRemove() {
+		try {
+			LifoSetQueue<String> queue = new LifoSetQueue<String>();
+			queue.add("1");
+			queue.add("2");
+			queue.add("3");
+			queue.add("4");
+			queue.add("5");
+			queue.add("6");
+			queue.add("7");
+			
+			assertEquals(7, queue.size());
+			assertEquals("7", queue.peek());
+			
+			// remove non-existing
+			assertFalse(queue.remove("8"));
+			
+			// remove the last
+			assertTrue(queue.remove("7"));
+			// remove the first
+			assertTrue(queue.remove("1"));
+			// remove in the middle
+			assertTrue(queue.remove("3"));
+			
+			assertEquals(4, queue.size());
+			
+			// check that we can re-add the removed elements
+			assertTrue(queue.add("1"));
+			assertTrue(queue.add("7"));
+			assertTrue(queue.add("3"));
+			assertEquals(7, queue.size());
+			
+			// check the order
+			assertEquals("3", queue.poll());
+			assertEquals("7", queue.poll());
+			assertEquals("1", queue.poll());
+			assertEquals("6", queue.poll());
+			assertEquals("5", queue.poll());
+			assertEquals("4", queue.poll());
+			assertEquals("2", queue.poll());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " : " + e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestDeploymentManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestDeploymentManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestDeploymentManager.java
deleted file mode 100644
index 81a7d60..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestDeploymentManager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.util.List;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.DeploymentManager;
-
-/**
- * This class provides an implementation of the {@DeploymentManager} interface which is used during
- * the unit tests.
- * <p>
- * This class is thread-safe.
- * 
- */
-public class TestDeploymentManager implements DeploymentManager {
-
-	/**
-	 * The ID of the job to be deployed.
-	 */
-	private volatile JobID jobID = null;
-
-	/**
-	 * The list of vertices to be deployed.
-	 */
-	private volatile List<ExecutionVertex> verticesToBeDeployed = null;
-
-	/**
-	 * Auxiliary object to synchronize on.
-	 */
-	private final Object synchronizationObject = new Object();
-
-
-	@Override
-	public void deploy(final JobID jobID, final Instance instance,
-			final List<ExecutionVertex> verticesToBeDeployed) {
-
-		this.jobID = jobID;
-		this.verticesToBeDeployed = verticesToBeDeployed;
-
-		synchronized (this.synchronizationObject) {
-			this.synchronizationObject.notify();
-		}
-	}
-
-	/**
-	 * Returns the ID of the last deployed job.
-	 */
-	JobID getIDOfLastDeployedJob() {
-
-		return this.jobID;
-	}
-
-	/**
-	 * Returns a list of the last deployed vertices.
-	 * 
-	 * @return a list of the last deployed vertices
-	 */
-	List<ExecutionVertex> getListOfLastDeployedVertices() {
-
-		return this.verticesToBeDeployed;
-	}
-
-	/**
-	 * Clears the internal state of the test deployment manager.
-	 */
-	void clear() {
-
-		this.jobID = null;
-		this.verticesToBeDeployed = null;
-	}
-
-	/**
-	 * Wait for the scheduler to complete the deployment.
-	 */
-	void waitForDeployment() {
-
-		while (this.jobID == null) {
-			synchronized (this.synchronizationObject) {
-				try {
-					this.synchronizationObject.wait(50);
-				} catch (InterruptedException e) {
-					// Ignore exception
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
deleted file mode 100644
index 9286def..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.net.Inet4Address;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.*;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.topology.NetworkNode;
-import org.apache.flink.runtime.topology.NetworkTopology;
-import org.apache.flink.util.StringUtils;
-
-/**
- * A dummy implementation of an {@link org.apache.flink.runtime.instance.InstanceManager}.
- */
-public final class TestInstanceManager implements InstanceManager {
-
-	/**
-	 * Counts the number of times the method releaseAllocatedResource is called.
-	 */
-	private volatile int numberOfReleaseCalls = 0;
-
-	/**
-	 * The instance listener.
-	 */
-	private volatile InstanceListener instanceListener = null;
-
-	/**
-	 * The list of resources allocated to a job.
-	 */
-	private final List<AllocatedResource> allocatedResources;
-
-	/**
-	 * The test instance
-	 */
-	private final TestInstance testInstance;
-
-	/**
-	 * Test implementation of {@link org.apache.flink.runtime.instance.Instance}.
-	 * 
-	 */
-	private static final class TestInstance extends Instance {
-
-		/**
-		 * Constructs a new test instance.
-		 * 
-		 * @param instanceConnectionInfo
-		 *        the instance connection information
-		 * @param parentNode
-		 *        the parent node in the network topology
-		 * @param networkTopology
-		 *        the network topology
-		 * @param hardwareDescription
-		 *        the hardware description
-		 * @param numberSlots
-		 * 		  the number of slots available on the instance
-		 */
-		public TestInstance(final InstanceConnectionInfo instanceConnectionInfo,
-				final NetworkNode parentNode, final NetworkTopology networkTopology,
-				final HardwareDescription hardwareDescription, int numberSlots) {
-			super(instanceConnectionInfo, parentNode, networkTopology, hardwareDescription, numberSlots);
-		}
-	}
-
-	/**
-	 * Constructs a new test instance manager
-	 */
-	public TestInstanceManager() {
-
-		final HardwareDescription hd = HardwareDescriptionFactory.construct(1, 1L, 1L);
-
-		this.allocatedResources = new ArrayList<AllocatedResource>();
-		try {
-			final InstanceConnectionInfo ici = new InstanceConnectionInfo(Inet4Address.getLocalHost(), 1, 1);
-			final NetworkTopology nt = new NetworkTopology();
-			this.testInstance = new TestInstance(ici, nt.getRootNode(), nt, hd, 1);
-			this.allocatedResources.add(new AllocatedResource(testInstance, new AllocationID()));
-		} catch (UnknownHostException e) {
-			throw new RuntimeException(StringUtils.stringifyException(e));
-		}
-	}
-
-
-	@Override
-	public void requestInstance(final JobID jobID, final Configuration conf,
-								int requiredSlots) throws InstanceException {
-
-		if (this.instanceListener == null) {
-			throw new InstanceException("instanceListener not registered with TestInstanceManager");
-		}
-
-		final InstanceListener il = this.instanceListener;
-
-		final Runnable runnable = new Runnable() {
-
-			/**
-			 * {@inheritDoc}
-			 */
-			@Override
-			public void run() {
-				il.resourcesAllocated(jobID, allocatedResources);
-			}
-		};
-
-		new Thread(runnable).start();
-	}
-
-
-	@Override
-	public void releaseAllocatedResource(final AllocatedResource allocatedResource) throws InstanceException {
-		++this.numberOfReleaseCalls;
-	}
-
-	/**
-	 * Returns the number of times the method releaseAllocatedResource has been called.
-	 * 
-	 * @return the number of times the method releaseAllocatedResource has been called
-	 */
-	int getNumberOfReleaseMethodCalls() {
-		return this.numberOfReleaseCalls;
-	}
-
-
-	@Override
-	public void reportHeartBeat(final InstanceConnectionInfo instanceConnectionInfo) {
-		throw new IllegalStateException("reportHeartBeat called on TestInstanceManager");
-	}
-
-	@Override
-	public void registerTaskManager(final InstanceConnectionInfo instanceConnectionInfo,
-									final HardwareDescription hardwareDescription, int numberSlots){
-		throw new IllegalStateException("registerTaskManager called on TestInstanceManager.");
-	}
-
-	@Override
-	public NetworkTopology getNetworkTopology(final JobID jobID) {
-		throw new IllegalStateException("getNetworkTopology called on TestInstanceManager");
-	}
-
-
-	@Override
-	public void setInstanceListener(final InstanceListener instanceListener) {
-
-		this.instanceListener = instanceListener;
-	}
-
-	@Override
-	public Instance getInstanceByName(final String name) {
-		throw new IllegalStateException("getInstanceByName called on TestInstanceManager");
-	}
-
-	@Override
-	public void shutdown() {
-		throw new IllegalStateException("shutdown called on TestInstanceManager");
-	}
-
-	@Override
-	public int getNumberOfTaskManagers() {
-		throw new IllegalStateException("getNumberOfTaskTrackers called on TestInstanceManager");
-	}
-
-	@Override
-	public int getNumberOfSlots() {
-		return this.testInstance.getNumberOfSlots();
-	}
-
-
-	@Override
-	public Map<InstanceConnectionInfo, Instance> getInstances() {
-		throw new IllegalStateException("getInstances called on TestInstanceManager");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
index 5b76d53..2b2c81d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
@@ -1,17 +1,20 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.runtime.jobmanager.splitassigner;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
index ddad0d3..fdc1cf4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
@@ -1,17 +1,20 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.runtime.jobmanager.splitassigner;