You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:55 UTC

[31/63] [abbrv] git commit: Redesign Scheduler from pre-assignment to more flexible schedule-on-demand model

Redesign Scheduler from pre-assignment to more flexible schedule-on-demand model


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2d6199ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2d6199ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2d6199ff

Branch: refs/heads/master
Commit: 2d6199fff877b0532903a4b2ff2d5279671b33cb
Parents: c32569a
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 20 13:11:23 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |    4 +-
 .../api/common/io/FileInputFormatTest.java      |    1 -
 .../runtime/executiongraph/ExecutionVertex.java |    4 +
 .../executiongraph/ExecutionVertex2.java        |   25 +
 .../runtime/instance/AllocatedResource.java     |  163 --
 .../flink/runtime/instance/AllocatedSlot.java   |   73 +-
 .../instance/DefaultInstanceManager.java        |  134 +-
 .../flink/runtime/instance/DummyInstance.java   |   64 -
 .../runtime/instance/HardwareDescription.java   |   10 +
 .../apache/flink/runtime/instance/Instance.java |  378 ++---
 .../instance/InstanceConnectionInfo.java        |    2 +-
 .../runtime/instance/InstanceDiedException.java |   34 +
 .../runtime/instance/InstanceException.java     |   42 -
 .../runtime/instance/InstanceListener.java      |   33 +-
 .../runtime/instance/InstanceNotifier.java      |   77 -
 .../scheduler/DefaultExecutionListener.java     |  133 --
 .../jobmanager/scheduler/DefaultScheduler.java  | 1568 ++++++++++--------
 .../scheduler/InstanceFillDegreeComparator.java |   31 +
 .../jobmanager/scheduler/LifoSetQueue.java      |  110 ++
 .../scheduler/NoResourceAvailableException.java |   33 +
 .../jobmanager/scheduler/ResourceId.java        |   20 +
 .../jobmanager/scheduler/ScheduledUnit.java     |   67 +
 .../scheduler/SchedulingException.java          |   44 -
 .../scheduler/SchedulingStrategy.java           |   33 +
 .../runtime/protocols/JobManagerProtocol.java   |   22 +-
 .../flink/runtime/taskmanager/TaskManager.java  |  370 +++--
 .../RegisterTaskManagerResult.java              |   56 -
 .../instance/LocalInstanceManagerTest.java      |   11 -
 .../scheduler/DefaultSchedulerTest.java         |  298 ++--
 .../jobmanager/scheduler/LifoSetQueueTest.java  |  128 ++
 .../scheduler/TestDeploymentManager.java        |  108 --
 .../scheduler/TestInstanceManager.java          |  194 ---
 .../splitassigner/DefaultSplitAssignerTest.java |   25 +-
 .../LocatableSplitAssignerTest.java             |   25 +-
 34 files changed, 2081 insertions(+), 2239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 4fba186..b2af886 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -384,9 +384,9 @@ public final class ConfigConstants {
 	public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;
 
 	/**
-	 * The default interval for TaskManager heart beats (2000 msecs).
+	 * The default interval for TaskManager heart beats (5000 msecs).
 	 */
-	public static final int DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL = 2000;
+	public static final int DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL = 5000;
 
 	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 9e93e67..707ecca 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.types.IntValue;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class FileInputFormatTest { 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 72e0696..d1ee262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -1000,4 +1000,8 @@ public final class ExecutionVertex {
 
 		return tdd;
 	}
+	
+	public void handleException(Throwable t) {
+		
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
new file mode 100644
index 0000000..ab33ca0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
@@ -0,0 +1,25 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.executiongraph;
+
+public class ExecutionVertex2 {
+
+	
+	
+	public void handleException(Throwable t) {
+		t.printStackTrace();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedResource.java
deleted file mode 100644
index c626309..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedResource.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-
-/**
- * An allocated resource object unambiguously defines the
- * hardware resources which have been assigned to an {@link org.apache.flink.runtime.executiongraph.ExecutionVertex} for
- * executing a task. The allocated resource is comprised of an {@link Instance}
- * which identifies the node the task is scheduled to run on as well as an
- * {@link org.apache.flink.runtime.instance.AllocationID} which determines the resources the task is scheduled to
- * allocate within the node.
- * <p>
- * The class is thread-safe.
- * 
- */
-public final class AllocatedResource {
-
-	/**
-	 * The instance a task is scheduled to run on.
-	 */
-	private final Instance instance;
-
-	/**
-	 * The allocation ID identifying the resources within the instance
-	 * which the task is expected to run on.
-	 */
-	private final AllocationID allocationID;
-
-	/**
-	 * The set stores the execution vertices which are currently scheduled to run this resource.
-	 */
-	private final Set<ExecutionVertex> assignedVertices = Collections
-		.newSetFromMap(new ConcurrentHashMap<ExecutionVertex, Boolean>());
-
-	/**
-	 * Constructs a new allocated resource object.
-	 * 
-	 * @param instance
-	 *        the instance a task is scheduled to run on.
-	 * @param allocationID
-	 *        the allocation ID identifying the allocated resources within the instance
-	 */
-	public AllocatedResource(final Instance instance, final AllocationID allocationID) {
-		this.instance = instance;
-		this.allocationID = allocationID;
-	}
-
-	/**
-	 * Returns the instance a task is scheduled to run on.
-	 *
-	 * @return the instance a task is scheduled to run on
-	 */
-	public Instance getInstance() {
-		return this.instance;
-	}
-
-	/**
-	 * Returns the allocation ID which identifies the resource allocated within the assigned instance.
-	 * 
-	 * @return the allocation ID or <code>null</code> if the assigned instance is of type {@link DummyInstance}
-	 */
-	public AllocationID getAllocationID() {
-		return this.allocationID;
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		if (obj instanceof AllocatedResource) {
-
-			final AllocatedResource allocatedResource = (AllocatedResource) obj;
-			if (!this.instance.equals(allocatedResource.getInstance())) {
-				return false;
-			}
-
-			if (this.allocationID == null) {
-				if (allocatedResource.getAllocationID() != null) {
-					return false;
-				}
-			} else {
-				if (!this.allocationID.equals(allocatedResource.getAllocationID())) {
-					return false;
-				}
-			}
-
-			return true;
-		}
-
-		return false;
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		if (this.allocationID == null) {
-			return 0;
-		}
-
-		return this.allocationID.hashCode();
-	}
-
-	/**
-	 * Assigns the given execution vertex to this allocated resource.
-	 * 
-	 * @param vertex
-	 *        the vertex to assign to this resource
-	 */
-	public void assignVertexToResource(final ExecutionVertex vertex) {
-
-		if (!this.assignedVertices.add(vertex)) {
-			throw new IllegalStateException("The vertex " + vertex + " has already been assigned to resource " + this);
-		}
-	}
-
-	/**
-	 * Returns an iterator over all execution vertices currently assigned to this allocated resource.
-	 * 
-	 * @return an iterator over all execution vertices currently assigned to this allocated resource
-	 */
-	public Iterator<ExecutionVertex> assignedVertices() {
-
-		return this.assignedVertices.iterator();
-	}
-
-	/**
-	 * Removes the given execution vertex from this allocated resource.
-	 * 
-	 * @param vertex
-	 *        the execution to be removed
-	 */
-	public void removeVertexFromResource(final ExecutionVertex vertex) {
-
-		if (!this.assignedVertices.remove(vertex)) {
-			throw new IllegalStateException("The vertex " + vertex + " has not been assigned to resource " + this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index d85bf39..71af9db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -16,56 +16,67 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
 
 /**
- * An allocated slot is a part of an instance which is assigned to a job.
- * <p>
- * This class is thread-safe.
- * 
+ * An allocated slot is the unit in which resources are allocated on instances.
  */
 public class AllocatedSlot {
 
-	/**
-	 * The allocation ID which identifies the resources occupied by this slot.
-	 */
-	private final AllocationID allocationID;
+	/** The ID which identifies the resources occupied by this slot. */
+	private final ResourceId resourceId;
 
-	/**
-	 * The ID of the job this slice belongs to.
-	 */
+	/** The ID of the job this slice belongs to. */
 	private final JobID jobID;
+	
+	/** The instance on which the slot is allocated */
+	private final Instance instance;
+	
+	/** The number of the slot on which the task is deployed */
+	private final int slotNumber;
 
-	/**
-	 * Creates a new allocated slice on the given hosting instance.
-	 * 
-	 * @param jobID
-	 *        the ID of the job this slice belongs to
-	 */
-	public AllocatedSlot(final JobID jobID) {
 
-		this.allocationID = new AllocationID();
+	public AllocatedSlot(JobID jobID, ResourceId resourceId, Instance instance, int slotNumber) {
+		this.resourceId = resourceId;
 		this.jobID = jobID;
+		this.instance = instance;
+		this.slotNumber = slotNumber;
 	}
 
+	// --------------------------------------------------------------------------------------------
+	
 	/**
-	 * Returns the allocation ID of this slice.
-	 * 
-	 * @return the allocation ID of this slice
-	 */
-	public AllocationID getAllocationID() {
-		return this.allocationID;
-	}
-
-	/**
-	 * Returns the ID of the job this allocated slice belongs to.
+	 * Returns the ID of the job this allocated slot belongs to.
 	 * 
-	 * @return the ID of the job this allocated slice belongs to
+	 * @return the ID of the job this allocated slot belongs to
 	 */
 	public JobID getJobID() {
 		return this.jobID;
 	}
+	
+	public ResourceId getResourceId() {
+		return resourceId;
+	}
+	
+	public Instance getInstance() {
+		return instance;
+	}
+	
+	public int getSlotNumber() {
+		return slotNumber;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public void runTask(ExecutionVertex2 vertex) {
+		
+	}
+	
+	public void cancelResource() {
+		
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
index b19adbb..eca23c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
@@ -21,14 +21,19 @@ package org.apache.flink.runtime.instance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 
@@ -54,6 +59,9 @@ public class DefaultInstanceManager implements InstanceManager {
 	
 	/** Set of hosts that were present once and have died */
 	private final Set<InstanceConnectionInfo> deadHosts;
+	
+	/** Listeners that want to be notified about availability and disappearance of instances */
+	private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
 
 	/** Duration after which a task manager is considered dead if it did not send a heart-beat message. */
 	private final long heartbeatTimeout;
@@ -106,7 +114,7 @@ public class DefaultInstanceManager implements InstanceManager {
 			this.cleanupStaleMachines.cancel();
 
 			for (Instance i : this.registeredHostsById.values()) {
-				i.destroy();
+				i.markDead();
 			}
 			
 			this.registeredHostsById.clear();
@@ -183,6 +191,9 @@ public class DefaultInstanceManager implements InstanceManager {
 
 			host.reportHeartBeat();
 			
+			// notify all listeners (for example the scheduler)
+			notifyNewInstance(host);
+			
 			return id;
 		}
 	}
@@ -204,48 +215,99 @@ public class DefaultInstanceManager implements InstanceManager {
 	
 	// --------------------------------------------------------------------------------------------
 	
+	public void addInstanceListener(InstanceListener listener) {
+		synchronized (this.instanceListeners) {
+			this.instanceListeners.add(listener);
+		}
+	}
+	
+	public void removeInstanceListener(InstanceListener listener) {
+		synchronized (this.instanceListeners) {
+			this.instanceListeners.remove(listener);
+		}
+	}
+	
+	private void notifyNewInstance(Instance instance) {
+		synchronized (this.instanceListeners) {
+			for (InstanceListener listener : this.instanceListeners) {
+				try {
+					listener.newInstanceAvailable(instance);
+				}
+				catch (Throwable t) {
+					LOG.error("Notification of new instance availability failed.", t);
+				}
+			}
+		}
+	}
+	
+	private void notifyDeadInstance(Instance instance) {
+		synchronized (this.instanceListeners) {
+			for (InstanceListener listener : this.instanceListeners) {
+				try {
+					listener.instanceDied(instance);
+				}
+				catch (Throwable t) {
+					LOG.error("Notification of dead instance failed.", t);
+				}
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private void checkForDeadInstances() {
+		final long now = System.currentTimeMillis();
+		final long timeout = DefaultInstanceManager.this.heartbeatTimeout;
+		
+		synchronized (DefaultInstanceManager.this.lock) {
+			if (DefaultInstanceManager.this.shutdown) {
+				return;
+			}
+
+			final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
+			
+			// check all hosts whether they did not send heart-beat messages.
+			while (entries.hasNext()) {
+				
+				final Map.Entry<InstanceID, Instance> entry = entries.next();
+				final Instance host = entry.getValue();
+				
+				if (!host.isStillAlive(now, timeout)) {
+					
+					// remove from the living
+					entries.remove();
+					registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
+					
+					// add to the dead
+					deadHosts.add(host.getInstanceConnectionInfo());
+					
+					host.markDead();
+					
+					totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
+					
+					LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
+							host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
+					
+					// report to all listeners
+					notifyDeadInstance(host);
+				}
+			}
+		}
+	}
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Periodic task that checks whether hosts have not sent their heart-beat
 	 * messages and purges the hosts in this case.
 	 */
 	private final TimerTask cleanupStaleMachines = new TimerTask() {
-
 		@Override
 		public void run() {
-
-			final long now = System.currentTimeMillis();
-			final long timeout = DefaultInstanceManager.this.heartbeatTimeout;
-			
-			synchronized (DefaultInstanceManager.this.lock) {
-				if (DefaultInstanceManager.this.shutdown) {
-					return;
-				}
-
-				final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
-				
-				// check all hosts whether they did not send heart-beat messages.
-				while (entries.hasNext()) {
-					
-					final Map.Entry<InstanceID, Instance> entry = entries.next();
-					final Instance host = entry.getValue();
-					
-					if (!host.isStillAlive(now, timeout)) {
-						
-						// remove from the living
-						entries.remove();
-						registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
-						
-						// add to the dead
-						deadHosts.add(host.getInstanceConnectionInfo());
-						
-						host.markDied();
-						
-						totalNumberOfAliveTaskSlots -= host.getNumberOfSlots();
-						
-						LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
-								host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
-					}
-				}
+			try {
+				checkForDeadInstances();
+			}
+			catch (Throwable t) {
+				LOG.error("Checking for dead instances failed.", t);
 			}
 		}
 	};

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DummyInstance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DummyInstance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DummyInstance.java
deleted file mode 100644
index af965c0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DummyInstance.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-/**
- * A DummyInstance is a stub implementation of the {@link Instance} interface.
- * Dummy instances are used to plan a job execution but must be replaced with
- * concrete instances before the job execution starts.
- * 
- */
-public class DummyInstance extends Instance {
-
-	private static int nextID = 0;
-
-	private final String name;
-
-	public static synchronized DummyInstance createDummyInstance() {
-
-		return new DummyInstance(nextID++);
-	}
-
-	/**
-	 * Constructs a new dummy instance of the given instance type.
-	 * 
-	 * @param id
-	 *        the ID of the dummy instance
-	 */
-	private DummyInstance(int id) {
-		super(null, null, null, null, 0);
-
-		this.name = "DummyInstance_" + Integer.toString(id);
-	}
-
-
-	@Override
-	public String toString() {
-
-		return this.name;
-	}
-
-
-	@Override
-	public HardwareDescription getHardwareDescription() {
-
-		throw new RuntimeException("getHardwareDescription is called on a DummyInstance");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
index affbdd6..32d6572 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
@@ -121,6 +121,16 @@ public final class HardwareDescription implements IOReadableWritable, java.io.Se
 	}
 	
 	// --------------------------------------------------------------------------------------------
+	// Utils
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return String.format("cores=%d, physMem=%d, heap=%d, managed=%d", 
+				numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, sizeOfManagedMemory);
+	}
+	
+	// --------------------------------------------------------------------------------------------
 	// Factory
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index c895bbb..3d39c8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -16,38 +16,30 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.instance;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Queue;
 import java.util.Set;
-import java.util.Collection;
 
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
 import org.apache.flink.runtime.ipc.RPC;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
-import org.apache.flink.runtime.taskmanager.TaskCancelResult;
-import org.apache.flink.runtime.taskmanager.TaskKillResult;
-import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
+import org.eclipse.jetty.util.log.Log;
 
 /**
  * An instance represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on.
  */
 public class Instance {
 	
+	/** The lock on which to synchronize allocations and failure state changes */
+	private final Object instanceLock = new Object();
+	
 	/** The connection info to connect to the task manager represented by this instance. */
 	private final InstanceConnectionInfo instanceConnectionInfo;
 	
@@ -60,20 +52,21 @@ public class Instance {
 	/** The number of task slots available on the node */
 	private final int numberOfSlots;
 
-	/**
-	 * Allocated slots on this instance
-	 */
-	private final Map<AllocationID, AllocatedSlot> allocatedSlots = new HashMap<AllocationID, AllocatedSlot>();
+	
+	private final Queue<Integer> availableSlots;
+	
+	/** Allocated slots on this instance */
+	private final Set<AllocatedSlot> allocatedSlots = new HashSet<AllocatedSlot>();
 
-	/**
-	 * Stores the RPC stub object for the instance's task manager.
-	 */
-	private TaskOperationProtocol taskManager = null;
+	/** The RPC proxy to send calls to the task manager represented by this instance */
+	private volatile TaskOperationProtocol taskManager ;
 
 	/**
 	 * Time when last heat beat has been received from the task manager running on this instance.
 	 */
 	private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
+	
+	private volatile boolean isDead;
 
 	/**
 	 * Constructs an abstract instance object.
@@ -88,125 +81,108 @@ public class Instance {
 		this.instanceId = id;
 		this.resources = resources;
 		this.numberOfSlots = numberOfSlots;
+		
+		this.availableSlots = new ArrayDeque<Integer>();
+		for (int i = 0; i < numberOfSlots; i++) {
+			this.availableSlots.add(i);
+		}
 	}
 
-	/**
-	 * Creates or returns the RPC stub object for the instance's task manager.
-	 * 
-	 * @return the RPC stub object for the instance's task manager
-	 * @throws IOException
-	 *         thrown if the RPC stub object for the task manager cannot be created
-	 */
-	private TaskOperationProtocol getTaskManagerProxy() throws IOException {
-
-		if (this.taskManager == null) {
-
-			this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
-				new InetSocketAddress(getInstanceConnectionInfo().address(),
-					getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
+	public TaskOperationProtocol getTaskManagerProxy() throws IOException {
+		TaskOperationProtocol tm = this.taskManager;
+		
+		if (tm == null) {
+			synchronized (this) {
+				if (this.taskManager == null) {
+					this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
+						new InetSocketAddress(getInstanceConnectionInfo().address(),
+							getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
+				}
+				tm = this.taskManager;
+			}
 		}
-
-		return this.taskManager;
+		
+		return tm;
 	}
 
-	/**
-	 * Destroys and removes the RPC stub object for this instance's task manager.
-	 */
+	/**  Destroys and removes the RPC stub object for this instance's task manager. */
 	private void destroyTaskManagerProxy() {
-
-		if (this.taskManager != null) {
-			RPC.stopProxy(this.taskManager);
-			this.taskManager = null;
+		synchronized (this) {
+			if (this.taskManager != null) {
+				try {
+					RPC.stopProxy(this.taskManager);
+				} catch (Throwable t) {
+					Log.debug("Error shutting down RPC proxy.", t);
+				}
+			}
 		}
 	}
 
-	/**
-	 * Returns the instance's connection information object.
-	 * 
-	 * @return the instance's connection information object
-	 */
-	public final InstanceConnectionInfo getInstanceConnectionInfo() {
-		return this.instanceConnectionInfo;
-	}
 
-	/**
-	 * Checks if all the libraries required to run the job with the given
-	 * job ID are available on this instance. Any libary that is missing
-	 * is transferred to the instance as a result of this call.
-	 * 
-	 * @param jobID
-	 *        the ID of the job whose libraries are to be checked for
-	 * @throws IOException
-	 *         thrown if an error occurs while checking for the libraries
-	 */
-	public synchronized void checkLibraryAvailability(final JobID jobID) throws IOException {
-
-		// Now distribute the required libraries for the job
-		String[] requiredLibraries = LibraryCacheManager.getRequiredJarFiles(jobID);
-
-		if (requiredLibraries == null) {
-			throw new IOException("No entry of required libraries for job " + jobID);
+	
+	// --------------------------------------------------------------------------------------------
+	// Life and Death
+	// --------------------------------------------------------------------------------------------
+	
+	public boolean isAlive() {
+		return !isDead;
+	}
+	
+	public void markDead() {
+		if (isDead) {
+			return;
 		}
-
-		LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
-		request.setRequiredLibraries(requiredLibraries);
-
-		// Send the request
-		LibraryCacheProfileResponse response = null;
-		response = getTaskManagerProxy().getLibraryCacheProfile(request);
-
-		// Check response and transfer libraries if necessary
-		for (int k = 0; k < requiredLibraries.length; k++) {
-			if (!response.isCached(k)) {
-				LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
-				getTaskManagerProxy().updateLibraryCache(update);
+		
+		isDead = true;
+		
+		synchronized (instanceLock) {
+			this.allocatedSlots.clear();
+			for (AllocatedSlot slot : allocatedSlots) {
+				slot.cancelResource();
 			}
 		}
+		
+		destroyTaskManagerProxy();
 	}
-
-	/**
-	 * Submits a list of tasks to the instance's {@link org.apache.flink.runtime.taskmanager.TaskManager}.
-	 * 
-	 * @param tasks
-	 *        the list of tasks to be submitted
-	 * @return the result of the submission attempt
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the task
-	 */
-	public synchronized List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks) throws IOException {
-		return getTaskManagerProxy().submitTasks(tasks);
+	
+	// --------------------------------------------------------------------------------------------
+	// Properties
+	// --------------------------------------------------------------------------------------------
+	
+	public InstanceID getId() {
+		return instanceId;
 	}
-
+	
+	public HardwareDescription getResources() {
+		return this.resources;
+	}
+	
+	public int getTotalNumberOfSlots() {
+		return numberOfSlots;
+	}
+	
 	/**
-	 * Cancels the task identified by the given ID at the instance's
-	 * {@link org.apache.flink.runtime.taskmanager.TaskManager}.
+	 * Returns the instance's connection information object.
 	 * 
-	 * @param id
-	 *        the ID identifying the task to be canceled
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the request or receiving the response
-	 * @return the result of the cancel attempt
+	 * @return the instance's connection information object
 	 */
-	public synchronized TaskCancelResult cancelTask(final ExecutionVertexID id) throws IOException {
-
-		return getTaskManagerProxy().cancelTask(id);
+	public InstanceConnectionInfo getInstanceConnectionInfo() {
+		return this.instanceConnectionInfo;
 	}
-
+	
+	// --------------------------------------------------------------------------------------------
+	// Heartbeats
+	// --------------------------------------------------------------------------------------------
+	
 	/**
-	 * Kills the task identified by the given ID at the instance's
-	 * {@link org.apache.flink.runtime.taskmanager.TaskManager}.
+	 * Gets the timestamp of the last heartbeat.
 	 * 
-	 * @param id
-	 *        the ID identifying the task to be killed
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the request or receiving the response
-	 * @return the result of the kill attempt
+	 * @return The timestamp of the last heartbeat.
 	 */
-	public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
-
-		return getTaskManagerProxy().killTask(id);
+	public long getLastHeartBeat() {
+		return this.lastReceivedHeartBeat;
 	}
-
+	
 	/**
 	 * Updates the time of last received heart beat to the current system time.
 	 */
@@ -214,141 +190,53 @@ public class Instance {
 		this.lastReceivedHeartBeat = System.currentTimeMillis();
 	}
 
-	public boolean isStillAlive(long now, long cleanUpInterval) {
-		return this.lastReceivedHeartBeat + cleanUpInterval > now;
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		// Fall back since dummy instances do not have a instanceConnectionInfo
-		if (this.instanceConnectionInfo == null) {
-			return super.equals(obj);
-		}
-
-		if (!(obj instanceof Instance)) {
-			return false;
-		}
-
-		final Instance abstractInstance = (Instance) obj;
-
-		return this.instanceConnectionInfo.equals(abstractInstance.getInstanceConnectionInfo());
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		// Fall back since dummy instances do not have a instanceConnectionInfo
-		if (this.instanceConnectionInfo == null) {
-			return super.hashCode();
-		}
-
-		return this.instanceConnectionInfo.hashCode();
-	}
-
-	/**
-	 * Triggers the remote task manager to print out the current utilization of its read and write buffers to its logs.
-	 * 
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the request
-	 */
-	public synchronized void logBufferUtilization() throws IOException {
-
-		getTaskManagerProxy().logBufferUtilization();
-	}
-
-	/**
-	 * Kills the task manager running on this instance. This method is mainly intended to test and debug Nephele's fault
-	 * tolerance mechanisms.
-	 * 
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the request
-	 */
-	public synchronized void killTaskManager() throws IOException {
-
-		getTaskManagerProxy().killTaskManager();
-	}
-
-	/**
-	 * Invalidates the entries identified by the given channel IDs from the remote task manager's receiver lookup cache.
-	 * 
-	 * @param channelIDs
-	 *        the channel IDs identifying the cache entries to invalidate
-	 * @throws IOException
-	 *         thrown if an error occurs during this remote procedure call
-	 */
-	public synchronized void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
-		getTaskManagerProxy().invalidateLookupCacheEntries(channelIDs);
-	}
-
 	/**
-	 * Destroys all RPC stub objects attached to this instance.
+	 * Checks whether the last heartbeat occurred within the last {@code n} milliseconds
+	 * before the given timestamp {@code now}.
+	 *  
+	 * @param now The timestamp representing the current time.
+	 * @param cleanUpInterval The maximum time (in msecs) that the last heartbeat may lie in the past.
+	 * @return True, if this instance is considered alive, false otherwise.
 	 */
-	public synchronized void destroyProxies() {
-
-		destroyTaskManagerProxy();
-
-	}
-
-	public int getNumberOfSlots() {
-		return numberOfSlots;
-	}
-
-	public int getNumberOfAvailableSlots() { return numberOfSlots - allocatedSlots.size(); }
-
-	public synchronized AllocatedResource allocateSlot(JobID jobID) throws InstanceException{
-		if(allocatedSlots.size() < numberOfSlots){
-			AllocatedSlot slot = new AllocatedSlot(jobID);
-
-			allocatedSlots.put(slot.getAllocationID(), slot);
-			return new AllocatedResource(this,slot.getAllocationID());
-		}else{
-			throw new InstanceException("Overbooking instance " + instanceConnectionInfo + ".");
-		}
-	}
-
-	public synchronized void releaseSlot(AllocationID allocationID) {
-		if(allocatedSlots.containsKey(allocationID)){
-			allocatedSlots.remove(allocationID);
-		}else{
-			throw new RuntimeException("There is no slot registered with allocation ID " + allocationID + ".");
-		}
-	}
-
-	public Collection<AllocatedSlot> getAllocatedSlots() {
-		return allocatedSlots.values();
-	}
-
-	public Collection<AllocatedSlot> removeAllocatedSlots() {
-		Collection<AllocatedSlot> slots = new ArrayList<AllocatedSlot>(this.allocatedSlots.values());
-
-		for(AllocatedSlot slot : slots){
-			releaseSlot(slot.getAllocationID());
-		}
-
-		return slots;
-	}
-
-	public long getLastHeartBeat() {
-		return this.lastReceivedHeartBeat;
+	public boolean isStillAlive(long now, long cleanUpInterval) {
+		return this.lastReceivedHeartBeat + cleanUpInterval > now;
 	}
 	
+	// --------------------------------------------------------------------------------------------
+	// Resource allocation
+	// --------------------------------------------------------------------------------------------
 	
-	public void markDied() {
-		
+	public AllocatedSlot allocateSlot(JobID jobID, ResourceId resourceId) throws InstanceDiedException {
+		synchronized (instanceLock) {
+			if (isDead) {
+				throw new InstanceDiedException(this);
+			}
+			
+			Integer nextSlot = availableSlots.poll();
+			if (nextSlot == null) {
+				return null;
+			} else {
+				AllocatedSlot slot = new AllocatedSlot(jobID, resourceId, this, nextSlot);
+				allocatedSlots.add(slot);
+				return slot;
+			}
+		}
 	}
 	
-	public void destroy() {
-		
+	public int getNumberOfAvailableSlots() {
+		return this.availableSlots.size();
 	}
 	
-	public InstanceID getId() {
-		return instanceId;
+	public boolean hasResourcesAvailable() {
+		return !isDead && getNumberOfAvailableSlots() > 0;
 	}
 	
-	public HardwareDescription getResources() {
-		return this.resources;
+	// --------------------------------------------------------------------------------------------
+	// Standard Utilities
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "Instance (" + this.instanceConnectionInfo + "), resources: " + this.resources + ", numberOfSlots=" + numberOfSlots;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index daf7e0d..4cbb2a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -183,7 +183,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	}
 
 	// --------------------------------------------------------------------------------------------
-	// Serialization
+	// Utilities
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
new file mode 100644
index 0000000..42b9817
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
@@ -0,0 +1,34 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.instance;
+
+/**
+ * A special instance signaling that an attempted operation on an instance is not possible,
+ * because the instance has died.
+ */
+public class InstanceDiedException extends Exception {
+	private static final long serialVersionUID = -4917918318403135745L;
+	
+	private final Instance instance;
+
+	public InstanceDiedException(Instance instance) {
+		this.instance = instance;
+	}
+	
+	public Instance getInstance() {
+		return instance;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceException.java
deleted file mode 100644
index 61d5868..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-/**
- * An instance exception is thrown if the allocation, assignment or deallocation of an instance fails.
- * 
- */
-public class InstanceException extends Exception {
-
-	/**
-	 * The generated serial UID.
-	 */
-	private static final long serialVersionUID = 3463832262505896962L;
-
-	/**
-	 * Constructs a new instance exception with the given error message.
-	 * 
-	 * @param errorMsg
-	 *        the error message to be included in the exception.
-	 */
-	public InstanceException(String errorMsg) {
-		super(errorMsg);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
index fbdef54..76e63b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
@@ -16,38 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.instance;
 
-import java.util.List;
-
-import org.apache.flink.runtime.jobgraph.JobID;
-
 /**
- * Classes implementing the {@link InstanceListener} interface can be notified about
- * the availability or the unexpected failure of an instance.
- * 
+ * Classes implementing the InstanceListener interface can be notified about
+ * the availability disappearance of instances.
  */
 public interface InstanceListener {
 
-	/**
-	 * Called if one or more requested resources have become available.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the initial request has been triggered for
-	 * @param allocatedResources
-	 *        the resources which have been allocated as a response to the initial request
-	 */
-	void resourcesAllocated(JobID jobID, List<AllocatedResource> allocatedResources);
-
-	/**
-	 * Called if one or more allocated resources assigned to at least one job have died unexpectedly.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the instance is used for
-	 * @param allocatedResource
-	 *        the allocated resources which are affected by the instance death
-	 */
-	void allocatedResourcesDied(JobID jobID, List<AllocatedResource> allocatedResource);
-
+	void newInstanceAvailable(Instance instance);
+	
+	void instanceDied(Instance instance);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceNotifier.java
deleted file mode 100644
index 9e85a83..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceNotifier.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-import java.util.List;
-
-import org.apache.flink.runtime.instance.AllocatedResource;
-import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-/**
- * This class is an auxiliary class to send the notification
- * about the availability of an {@link org.apache.flink.runtime.instance.Instance} to the given {@link
- * InstanceListener} object. The notification must be sent from
- * a separate thread, otherwise the atomic operation of requesting an instance
- * for a vertex and switching to the state ASSIGNING could not be guaranteed.
- * This class is thread-safe.
- * 
- */
-public class InstanceNotifier extends Thread {
-
-	/**
-	 * The {@link InstanceListener} object to send the notification to.
-	 */
-	private final InstanceListener instanceListener;
-
-	/**
-	 * The ID of the job the notification refers to.
-	 */
-	private final JobID jobID;
-
-	/**
-	 * The allocated resources the notification refers to.
-	 */
-	private final List<AllocatedResource> allocatedResources;
-
-	/**
-	 * Constructs a new instance notifier object.
-	 * 
-	 * @param instanceListener
-	 *        the listener to send the notification to
-	 * @param jobID
-	 *        the ID of the job the notification refers to
-	 * @param allocatedResources
-	 *        the resources with has been allocated for the job
-	 */
-	public InstanceNotifier(final InstanceListener instanceListener, final JobID jobID,
-							final List<AllocatedResource> allocatedResources) {
-		this.instanceListener = instanceListener;
-		this.jobID = jobID;
-		this.allocatedResources = allocatedResources;
-	}
-
-
-	@Override
-	public void run() {
-
-		this.instanceListener.resourcesAllocated(this.jobID, this.allocatedResources);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultExecutionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultExecutionListener.java
deleted file mode 100644
index d0bbdca..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultExecutionListener.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.execution.ExecutionListener;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionPipeline;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.executiongraph.InternalJobStatus;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-public class DefaultExecutionListener implements ExecutionListener {
-
-	/**
-	 * The instance of the {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler}.
-	 */
-	private final DefaultScheduler scheduler;
-
-	/**
-	 * The {@link ExecutionVertex} this wrapper object belongs to.
-	 */
-	private final ExecutionVertex executionVertex;
-
-	/**
-	 * Constructs a new wrapper object for the given {@link ExecutionVertex}.
-	 * 
-	 * @param scheduler
-	 *        the instance of the {@link DefaultScheduler}
-	 * @param executionVertex
-	 *        the {@link ExecutionVertex} the received notification refer to
-	 */
-	public DefaultExecutionListener(final DefaultScheduler scheduler, final ExecutionVertex executionVertex) {
-		this.scheduler = scheduler;
-		this.executionVertex = executionVertex;
-	}
-
-
-	@Override
-	public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
-			final ExecutionState newExecutionState, final String optionalMessage) {
-
-		final ExecutionGraph eg = this.executionVertex.getExecutionGraph();
-
-		// Check if we can deploy a new pipeline.
-		if (newExecutionState == ExecutionState.FINISHING) {
-
-			final ExecutionPipeline pipeline = this.executionVertex.getExecutionPipeline();
-			if (!pipeline.isFinishing()) {
-				// Some tasks of the pipeline are still running
-				return;
-			}
-
-			// Find another vertex in the group which is still in SCHEDULED state and get its pipeline.
-			final ExecutionGroupVertex groupVertex = this.executionVertex.getGroupVertex();
-			for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
-				final ExecutionVertex groupMember = groupVertex.getGroupMember(i);
-				if (groupMember.compareAndUpdateExecutionState(ExecutionState.SCHEDULED, ExecutionState.ASSIGNED)) {
-
-					final ExecutionPipeline pipelineToBeDeployed = groupMember.getExecutionPipeline();
-					pipelineToBeDeployed.setAllocatedResource(this.executionVertex.getAllocatedResource());
-					pipelineToBeDeployed.updateExecutionState(ExecutionState.ASSIGNED);
-
-					this.scheduler.deployAssignedPipeline(pipelineToBeDeployed);
-					return;
-				}
-			}
-		}
-
-		if (newExecutionState == ExecutionState.CANCELED || newExecutionState == ExecutionState.FINISHED) {
-
-			synchronized (eg) {
-
-				if (this.scheduler.getVerticesToBeRestarted().remove(this.executionVertex.getID()) != null) {
-
-					if (eg.getJobStatus() == InternalJobStatus.FAILING) {
-						return;
-					}
-
-					this.executionVertex.updateExecutionState(ExecutionState.ASSIGNED, "Restart as part of recovery");
-
-					// Run through the deployment procedure
-					this.scheduler.deployAssignedVertices(this.executionVertex);
-					return;
-				}
-			}
-		}
-
-		if (newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
-			|| newExecutionState == ExecutionState.FAILED) {
-			// Check if instance can be released
-			this.scheduler.checkAndReleaseAllocatedResource(eg, this.executionVertex.getAllocatedResource());
-		}
-	}
-
-
-	@Override
-	public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
-		// Nothing to do here
-	}
-
-
-	@Override
-	public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
-		// Nothing to do here
-	}
-
-
-	@Override
-	public int getPriority() {
-
-		return 0;
-	}
-}