You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:49 UTC

[25/63] [abbrv] git commit: Redesign Scheduler part 2

Redesign Scheduler part 2


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e6aadfcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e6aadfcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e6aadfcc

Branch: refs/heads/master
Commit: e6aadfccdaf02b9df65d10ac835cab7fd26e274e
Parents: aa7550a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 25 03:09:49 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200

----------------------------------------------------------------------
 .../flink/client/CliFrontendListCancelTest.java |   7 -
 .../org/apache/flink/runtime/AbstractID.java    |  49 +-
 .../runtime/execution/ExecutionListener.java    |  11 -
 .../runtime/executiongraph/ExecutionGraph.java  |  11 +-
 .../executiongraph/ExecutionVertex2.java        |  63 +-
 .../VertexAssignmentListener.java               |  40 --
 .../flink/runtime/filecache/FileCache.java      |   2 +-
 .../flink/runtime/instance/AllocatedSlot.java   |  84 ++-
 .../flink/runtime/instance/AllocationID.java    |  32 -
 .../apache/flink/runtime/instance/Hardware.java |   2 +-
 .../apache/flink/runtime/instance/Instance.java | 202 ++++--
 .../instance/InstanceConnectionInfo.java        |   6 +-
 .../flink/runtime/instance/InstanceID.java      |   6 +-
 .../runtime/instance/InstanceListener.java      |  10 +
 .../flink/runtime/jobgraph/JobVertexID.java     |   6 +-
 .../flink/runtime/jobmanager/JobManager.java    | 102 +--
 .../jobmanager/scheduler/DefaultScheduler.java  | 528 ++++++++--------
 .../scheduler/InstanceFillDegreeComparator.java |  31 -
 .../jobmanager/scheduler/ScheduledUnit.java     |  68 +-
 .../scheduler/SchedulingStrategy.java           |  33 -
 .../runtime/jobmanager/scheduler/SetQueue.java  | 134 ++++
 .../jobmanager/scheduler/SharedSlot.java        |  99 +++
 .../scheduler/SlotAllocationFuture.java         |  94 +++
 .../scheduler/SlotAllocationFutureAction.java   |  34 +
 .../scheduler/SlotAvailablilityListener.java    |  29 +
 .../jobmanager/scheduler/SlotSharingGroup.java  |  70 ++
 .../scheduler/SlotSharingGroupAssignment.java   | 270 ++++++++
 .../runtime/jobmanager/scheduler/SubSlot.java   |  69 ++
 .../jobmanager/web/SetupInfoServlet.java        |   2 +-
 .../protocols/ExtendedManagementProtocol.java   |  10 -
 .../protocols/TaskOperationProtocol.java        |   5 -
 .../taskmanager/ExecutorThreadFactory.java      |  41 --
 .../flink/runtime/taskmanager/TaskManager.java  |   1 +
 .../runtime/ExecutorThreadFactory.java          |  41 --
 .../runtime/util/ExecutorThreadFactory.java     |  38 ++
 .../flink/runtime/util/NativeCodeLoader.java    | 129 ----
 .../apache/flink/runtime/AbstractIDTest.java    |  75 ++-
 .../flink/runtime/instance/HardwareTest.java    |  25 +-
 .../instance/InstanceConnectionInfoTest.java    |  25 +-
 .../flink/runtime/instance/InstanceTest.java    | 174 +++++
 .../instance/LocalInstanceManagerTest.java      |   4 -
 .../scheduler/DefaultSchedulerTest.java         | 180 ------
 .../scheduler/SchedulerIsolatedTasksTest.java   | 385 +++++++++++
 .../scheduler/SchedulerSlotSharingTest.java     | 633 +++++++++++++++++++
 .../scheduler/SchedulerTestUtils.java           | 145 +++++
 .../jobmanager/scheduler/SetQueueTest.java      | 110 ++++
 .../jobmanager/scheduler/SharedSlotsTest.java   | 118 ++++
 .../scheduler/SlotAllocationFutureTest.java     | 177 ++++++
 48 files changed, 3328 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index fa4d9a3..f335745 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client;
 
 import static org.junit.Assert.assertTrue;
@@ -28,7 +27,6 @@ import java.util.List;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.flink.client.CliFrontend;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.runtime.client.JobCancelResult;
 import org.apache.flink.runtime.client.JobProgressResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
@@ -197,11 +195,6 @@ public class CliFrontendListCancelTest {
 		}
 
 		@Override
-		public void killInstance(StringRecord instanceName) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
 		public void logBufferUtilization(JobID jobID) throws IOException {
 			throw new UnsupportedOperationException();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
index 73cd8fc..458907c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
@@ -32,7 +32,7 @@ import io.netty.buffer.ByteBuf;
 /**
  * A statistically unique identification number.
  */
-public class AbstractID implements IOReadableWritable {
+public class AbstractID implements IOReadableWritable, Comparable<AbstractID> {
 
 	/** The size of a long in bytes */
 	private static final int SIZE_OF_LONG = 8;
@@ -139,24 +139,8 @@ public class AbstractID implements IOReadableWritable {
 		this.lowerPart = src.lowerPart;
 		this.upperPart = src.upperPart;
 	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof AbstractID) {
-			AbstractID src = (AbstractID) obj;
-			return src.lowerPart == this.lowerPart && src.upperPart == this.upperPart;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return ((int)  this.lowerPart) ^
-				((int) (this.lowerPart >>> 32)) ^
-				((int)  this.upperPart) ^
-				((int) (this.upperPart >>> 32));
-	}
+	
+	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public void read(DataInputView in) throws IOException {
@@ -180,6 +164,26 @@ public class AbstractID implements IOReadableWritable {
 		buf.writeLong(this.upperPart);
 	}
 
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj instanceof AbstractID) {
+			AbstractID src = (AbstractID) obj;
+			return src.lowerPart == this.lowerPart && src.upperPart == this.upperPart;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return ((int)  this.lowerPart) ^
+				((int) (this.lowerPart >>> 32)) ^
+				((int)  this.upperPart) ^
+				((int) (this.upperPart >>> 32));
+	}
+	
 	@Override
 	public String toString() {
 		final byte[] ba = new byte[SIZE];
@@ -187,4 +191,11 @@ public class AbstractID implements IOReadableWritable {
 		longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
 		return StringUtils.byteToHexString(ba);
 	}
+	
+	@Override
+	public int compareTo(AbstractID o) {
+		int diff1 = Long.compare(this.upperPart, o.upperPart);
+		int diff2 = Long.compare(this.lowerPart, o.lowerPart);
+		return diff1 == 0 ? diff2 : diff1;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
index 305f47d..ded630f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.execution;
 
 import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
@@ -25,20 +24,10 @@ import org.apache.flink.runtime.jobgraph.JobID;
 /**
  * This interface must be implemented by classes which should be able to receive notifications about
  * changes of a task's execution state.
- * 
  */
 public interface ExecutionListener {
 
 	/**
-	 * Returns the priority of the execution listener object. If multiple execution listener objects are registered for
-	 * a given vertex, the priority determines in which order they will be called. Priorities are expressed as
-	 * non-negative integer values. The lower the integer value, the higher the call priority.
-	 * 
-	 * @return the priority of this execution listener
-	 */
-	int getPriority();
-
-	/**
 	 * Called when the execution state of the associated task has changed. It is important to point out that multiple
 	 * execution listeners can be invoked as a reaction to a state change, according to their priority. As a result, the
 	 * value of <code>newExecutionState</code> may be out-dated by the time a particular execution listener is called.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cea0271..bc95250 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.executiongraph;
 
 import java.util.ArrayList;
@@ -43,8 +42,6 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.execution.ExecutionListener;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.instance.AllocatedResource;
-import org.apache.flink.runtime.instance.DummyInstance;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.io.network.gates.GateID;
@@ -57,7 +54,7 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobInputVertex;
 import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.taskmanager.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.StringUtils;
 
 /**
@@ -1291,12 +1288,6 @@ public class ExecutionGraph implements ExecutionListener {
 		return this.stages.iterator();
 	}
 
-
-	@Override
-	public int getPriority() {
-		return 1;
-	}
-
 	/**
 	 * Performs an asynchronous update operation to this execution graph.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
index ab33ca0..09a6f5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
@@ -15,11 +15,70 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
 public class ExecutionVertex2 {
 
+	private final JobVertexID jobVertexId;
+	
+			
+			
+	public ExecutionVertex2() {
+		this(new JobVertexID());
+	}
+	
+	public ExecutionVertex2(JobVertexID jobVertexId) {
+		this.jobVertexId = jobVertexId;
+	}
+	
+	
+	
+	public JobID getJobId() {
+		return new JobID();
+	}
+	
+	
+	public JobVertexID getJobvertexId() {
+		return this.jobVertexId;
+	}
+	
+	public String getTaskName() {
+		return "task";
+	}
+	
+	public int getTotalNumberOfParallelSubtasks() {
+		return 1;
+	}
+	
+	public int getParallelSubtaskIndex() {
+		return 0;
+	}
+
+	
+	// --------------------------------------------------------------------------------------------
+	//  Scheduling
+	// --------------------------------------------------------------------------------------------
+	
+	public Iterable<Instance> getPreferredLocations() {
+		return null;
+	}
+	
 	
+	// --------------------------------------------------------------------------------------------
+	//  Utilities
+	// --------------------------------------------------------------------------------------------
 	
-	public void handleException(Throwable t) {
-		t.printStackTrace();
+	/**
+	 * Creates a simple name representation in the style 'taskname (x/y)', where
+	 * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
+	 * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
+	 * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
+	 * 
+	 * @return A simple name representation.
+	 */
+	public String getSimpleName() {
+		return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexAssignmentListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexAssignmentListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexAssignmentListener.java
deleted file mode 100644
index f09b10f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexAssignmentListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.instance.AllocatedResource;
-
-/**
- * Classes implementing the {@link VertexAssignmentListener} interface can register for notifications about changes in
- * the assignment of an {@link ExecutionVertex} to an {@link AllocatedResource}.
- * 
- */
-public interface VertexAssignmentListener {
-
-	/**
-	 * Called when an {@link ExecutionVertex} has been assigned to an {@link AllocatedResource}.
-	 * 
-	 * @param id
-	 *        the ID of the vertex which has been reassigned
-	 * @param newAllocatedResource
-	 *        the allocated resource the vertex is now assigned to
-	 */
-	void vertexAssignmentChanged(ExecutionVertexID id, AllocatedResource newAllocatedResource);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 35562de..63b4d54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -42,7 +42,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.taskmanager.runtime.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.IOUtils;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index 6289d45..cb7e658 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -18,20 +18,24 @@
 
 package org.apache.flink.runtime.instance;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
 
 /**
  * An allocated slot is the unit in which resources are allocated on instances.
  */
 public class AllocatedSlot {
+	
+	private static final AtomicIntegerFieldUpdater<AllocatedSlot> STATUS_UPDATER = 
+			AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");
+	 
+	private static final int ALLOCATED_AND_ALIVE = 0;		// tasks may be added and might be running
+	private static final int CANCELLED = 1;					// no more tasks may run
+	private static final int RELEASED = 2;					// has been given back to the instance
 
-	/** The ID which identifies the resources occupied by this slot. */
-	private final ResourceId resourceId;
-
+	
 	/** The ID of the job this slice belongs to. */
 	private final JobID jobID;
 	
@@ -41,12 +45,15 @@ public class AllocatedSlot {
 	/** The number of the slot on which the task is deployed */
 	private final int slotNumber;
 	
-	/** Flag that marks the schedule as active */
-	private final AtomicBoolean active = new AtomicBoolean(true);
+	private volatile int status = ALLOCATED_AND_ALIVE;
+	
 
 
-	public AllocatedSlot(JobID jobID, ResourceId resourceId, Instance instance, int slotNumber) {
-		this.resourceId = resourceId;
+	public AllocatedSlot(JobID jobID, Instance instance, int slotNumber) {
+		if (jobID == null || instance == null || slotNumber < 0) {
+			throw new IllegalArgumentException();
+		}
+		
 		this.jobID = jobID;
 		this.instance = instance;
 		this.slotNumber = slotNumber;
@@ -63,10 +70,6 @@ public class AllocatedSlot {
 		return this.jobID;
 	}
 	
-	public ResourceId getResourceId() {
-		return resourceId;
-	}
-	
 	public Instance getInstance() {
 		return instance;
 	}
@@ -87,7 +90,60 @@ public class AllocatedSlot {
 		return true;
 	}
 	
-	public void cancelResource() {
+	// --------------------------------------------------------------------------------------------
+	//  Status and life cycle
+	// --------------------------------------------------------------------------------------------
+	
+	public boolean isAlive() {
+		return status == ALLOCATED_AND_ALIVE;
+	}
+	
+	public boolean isCanceled() {
+		return status != ALLOCATED_AND_ALIVE;
+	}
+	
+	public boolean isReleased() {
+		return status == RELEASED;
+	}
+	
+	
+	public void cancel() {
+		if (STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED)) {
+			// kill all tasks currently running in this slot
+		}
+	}
+	
+	public void releaseSlot() {
+		// cancel everything, if there is something. since this is atomically status based,
+		// it will not happen twice if another attempt happened before or concurrently
+		cancel();
 		
+		this.instance.returnAllocatedSlot(this);
+	}
+	
+	protected boolean markReleased() {
+		return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Utilities
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return instance.getId() + " (" + slotNumber + ") - " + getStateName(status);
+	}
+	
+	private static final String getStateName(int state) {
+		switch (state) {
+		case ALLOCATED_AND_ALIVE:
+			return "ALLOCATED/ALIVE";
+		case CANCELLED:
+			return "CANCELLED";
+		case RELEASED:
+			return "RELEASED";
+		default:
+			return "(unknown)";
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocationID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocationID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocationID.java
deleted file mode 100644
index c315dc3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocationID.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.AbstractID;
-
-/**
- * An allocation ID unambiguously identifies the allocated resources
- * within an {@link Instance}. The ID is necessary if an {@link InstanceManager} decides to partition
- * {@link Instance}s
- * without the knowledge of Nephele's scheduler.
- * 
- */
-public class AllocationID extends AbstractID {
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
index 92ccfb2..aa61927 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
@@ -76,7 +76,7 @@ public class Hardware {
 				return -1;
 				
 			default:
-				LOG.error("Unrecognized OS");
+				LOG.error("Unrecognized OS: " + OperatingSystem.getCurrentOperatingSystem());
 				return -1;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 3d39c8f..a168b2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -21,13 +21,15 @@ package org.apache.flink.runtime.instance;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 
 import org.apache.flink.runtime.ipc.RPC;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailablilityListener;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
 import org.eclipse.jetty.util.log.Log;
@@ -52,14 +54,19 @@ public class Instance {
 	/** The number of task slots available on the node */
 	private final int numberOfSlots;
 
-	
+	/** A list of available slot positons */
 	private final Queue<Integer> availableSlots;
 	
 	/** Allocated slots on this instance */
 	private final Set<AllocatedSlot> allocatedSlots = new HashSet<AllocatedSlot>();
 
+	
+	/** A listener to be notified upon new slot availability */
+	private SlotAvailablilityListener slotListener;
+	
+	
 	/** The RPC proxy to send calls to the task manager represented by this instance */
-	private volatile TaskOperationProtocol taskManager ;
+	private volatile TaskOperationProtocol taskManager;
 
 	/**
 	 * Time when last heat beat has been received from the task manager running on this instance.
@@ -68,6 +75,8 @@ public class Instance {
 	
 	private volatile boolean isDead;
 
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Constructs an abstract instance object.
 	 * 
@@ -82,43 +91,36 @@ public class Instance {
 		this.resources = resources;
 		this.numberOfSlots = numberOfSlots;
 		
-		this.availableSlots = new ArrayDeque<Integer>();
+		this.availableSlots = new ArrayDeque<Integer>(numberOfSlots);
 		for (int i = 0; i < numberOfSlots; i++) {
 			this.availableSlots.add(i);
 		}
 	}
 
-	public TaskOperationProtocol getTaskManagerProxy() throws IOException {
-		TaskOperationProtocol tm = this.taskManager;
-		
-		if (tm == null) {
-			synchronized (this) {
-				if (this.taskManager == null) {
-					this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
-						new InetSocketAddress(getInstanceConnectionInfo().address(),
-							getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
-				}
-				tm = this.taskManager;
-			}
-		}
-		
-		return tm;
+	// --------------------------------------------------------------------------------------------
+	// Properties
+	// --------------------------------------------------------------------------------------------
+	
+	public InstanceID getId() {
+		return instanceId;
 	}
-
-	/**  Destroys and removes the RPC stub object for this instance's task manager. */
-	private void destroyTaskManagerProxy() {
-		synchronized (this) {
-			if (this.taskManager != null) {
-				try {
-					RPC.stopProxy(this.taskManager);
-				} catch (Throwable t) {
-					Log.debug("Error shutting down RPC proxy.", t);
-				}
-			}
-		}
+	
+	public HardwareDescription getResources() {
+		return this.resources;
+	}
+	
+	public int getTotalNumberOfSlots() {
+		return numberOfSlots;
+	}
+	
+	/**
+	 * Returns the instance's connection information object.
+	 * 
+	 * @return the instance's connection information object
+	 */
+	public InstanceConnectionInfo getInstanceConnectionInfo() {
+		return this.instanceConnectionInfo;
 	}
-
-
 	
 	// --------------------------------------------------------------------------------------------
 	// Life and Death
@@ -136,38 +138,52 @@ public class Instance {
 		isDead = true;
 		
 		synchronized (instanceLock) {
-			this.allocatedSlots.clear();
+			
+			// no more notifications for the slot releasing
+			this.slotListener = null;
+			
 			for (AllocatedSlot slot : allocatedSlots) {
-				slot.cancelResource();
+				slot.releaseSlot();
 			}
+			allocatedSlots.clear();
+			availableSlots.clear();
 		}
 		
 		destroyTaskManagerProxy();
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	// Properties
+	//  Connection to the TaskManager
 	// --------------------------------------------------------------------------------------------
 	
-	public InstanceID getId() {
-		return instanceId;
-	}
-	
-	public HardwareDescription getResources() {
-		return this.resources;
-	}
-	
-	public int getTotalNumberOfSlots() {
-		return numberOfSlots;
+	public TaskOperationProtocol getTaskManagerProxy() throws IOException {
+		TaskOperationProtocol tm = this.taskManager;
+		
+		if (tm == null) {
+			synchronized (this) {
+				if (this.taskManager == null) {
+					this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
+						new InetSocketAddress(getInstanceConnectionInfo().address(),
+							getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
+				}
+				tm = this.taskManager;
+			}
+		}
+		
+		return tm;
 	}
-	
-	/**
-	 * Returns the instance's connection information object.
-	 * 
-	 * @return the instance's connection information object
-	 */
-	public InstanceConnectionInfo getInstanceConnectionInfo() {
-		return this.instanceConnectionInfo;
+
+	/**  Destroys and removes the RPC stub object for this instance's task manager. */
+	private void destroyTaskManagerProxy() {
+		synchronized (this) {
+			if (this.taskManager != null) {
+				try {
+					RPC.stopProxy(this.taskManager);
+				} catch (Throwable t) {
+					Log.debug("Error shutting down RPC proxy.", t);
+				}
+			}
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -206,7 +222,11 @@ public class Instance {
 	// Resource allocation
 	// --------------------------------------------------------------------------------------------
 	
-	public AllocatedSlot allocateSlot(JobID jobID, ResourceId resourceId) throws InstanceDiedException {
+	public AllocatedSlot allocateSlot(JobID jobID) throws InstanceDiedException {
+		if (jobID == null) {
+			throw new IllegalArgumentException();
+		}
+		
 		synchronized (instanceLock) {
 			if (isDead) {
 				throw new InstanceDiedException(this);
@@ -216,27 +236,95 @@ public class Instance {
 			if (nextSlot == null) {
 				return null;
 			} else {
-				AllocatedSlot slot = new AllocatedSlot(jobID, resourceId, this, nextSlot);
+				AllocatedSlot slot = new AllocatedSlot(jobID, this, nextSlot);
 				allocatedSlots.add(slot);
 				return slot;
 			}
 		}
 	}
 	
+	public boolean returnAllocatedSlot(AllocatedSlot slot) {
+		// the slot needs to be in the returned to instance state
+		if (slot == null || slot.getInstance() != this) {
+			throw new IllegalArgumentException("Slot is null or belongs to the wrong instance.");
+		}
+		if (slot.isAlive()) {
+			throw new IllegalArgumentException("Slot is still alive");
+		}
+		
+		if (slot.markReleased()) { 
+			synchronized (instanceLock) {
+				if (isDead) {
+					return false;
+				}
+			
+				if (this.allocatedSlots.remove(slot)) {
+					this.availableSlots.add(slot.getSlotNumber());
+					
+					if (this.slotListener != null) {
+						this.slotListener.newSlotAvailable(this);
+					}
+					
+					return true;
+				} else {
+					throw new IllegalArgumentException("Slot was not allocated from the instance.");
+				}
+			}
+		} else {
+			return false;
+		}
+	}
+	
+	public void cancelAndReleaseAllSlots() {
+		synchronized (instanceLock) {
+			// we need to do this copy because of concurrent modification exceptions
+			List<AllocatedSlot> copy = new ArrayList<AllocatedSlot>(this.allocatedSlots);
+			
+			for (AllocatedSlot slot : copy) {
+				slot.releaseSlot();
+			}
+			allocatedSlots.clear();
+		}
+	}
+	
 	public int getNumberOfAvailableSlots() {
 		return this.availableSlots.size();
 	}
 	
+	public int getNumberOfAllocatedSlots() {
+		return this.allocatedSlots.size();
+	}
+	
 	public boolean hasResourcesAvailable() {
 		return !isDead && getNumberOfAvailableSlots() > 0;
 	}
 	
 	// --------------------------------------------------------------------------------------------
+	// Listeners
+	// --------------------------------------------------------------------------------------------
+	
+	public void setSlotAvailabilityListener(SlotAvailablilityListener slotListener) {
+		synchronized (instanceLock) {
+			if (this.slotListener != null) {
+				throw new IllegalStateException("Instance has already a slot listener.");
+			} else {
+				this.slotListener = slotListener;
+			}
+		}
+	}
+	
+	public void removeSlotListener() {
+		synchronized (instanceLock) {
+			this.slotListener = null;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
 	// Standard Utilities
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public String toString() {
-		return "Instance (" + this.instanceConnectionInfo + "), resources: " + this.resources + ", numberOfSlots=" + numberOfSlots;
+		return instanceId + " @" + this.instanceConnectionInfo + ' ' + numberOfSlots + " slots";
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index 4cbb2a9..ec63c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -67,18 +67,18 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	 *        the port instance's task manager expects to receive transfer envelopes on
 	 */
 	public InstanceConnectionInfo(InetAddress inetAddress, int ipcPort, int dataPort) {
-
 		if (inetAddress == null) {
 			throw new IllegalArgumentException("Argument inetAddress must not be null");
 		}
-
 		if (ipcPort <= 0) {
 			throw new IllegalArgumentException("Argument ipcPort must be greater than zero");
 		}
-
 		if (dataPort <= 0) {
 			throw new IllegalArgumentException("Argument dataPort must be greater than zero");
 		}
+		if (ipcPort == dataPort) {
+			throw new IllegalArgumentException("IPC and data port must be different");
+		}
 
 		this.ipcPort = ipcPort;
 		this.dataPort = dataPort;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java
index 388bba2..9c922af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java
@@ -16,15 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.AbstractID;
 
 /**
  * Class for statistically unique instance IDs.
- * 
  */
-public class InstanceID extends AbstractID {
-
-}
+public class InstanceID extends AbstractID {}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
index 76e63b8..d42d404 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
@@ -24,7 +24,17 @@ package org.apache.flink.runtime.instance;
  */
 public interface InstanceListener {
 
+	/**
+	 * Called when a new instance becomes available.
+	 * 
+	 * @param instance The instance that became available.
+	 */
 	void newInstanceAvailable(Instance instance);
 	
+	/**
+	 * Called when an instance died.
+	 * 
+	 * @param instance The instance that died.
+	 */
 	void instanceDied(Instance instance);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
index ba76e02..6a56c19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
@@ -16,15 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobgraph;
 
 import org.apache.flink.runtime.AbstractID;
 
 /**
  * A class for statistically unique job vertex IDs.
- * 
  */
-public class JobVertexID extends AbstractID {
-
-}
+public class JobVertexID extends AbstractID {}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 1f5fc96..3b76b78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -70,11 +70,11 @@ import org.apache.flink.runtime.executiongraph.GraphConversionException;
 import org.apache.flink.runtime.executiongraph.InternalJobStatus;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.instance.DefaultInstanceManager;
-import org.apache.flink.runtime.instance.DummyInstance;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
 import org.apache.flink.runtime.io.network.RemoteReceiver;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
@@ -87,8 +87,6 @@ import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
 import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
 import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
 import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.SchedulingException;
-import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager;
 import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitWrapper;
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
 import org.apache.flink.runtime.managementgraph.ManagementGraph;
@@ -101,14 +99,13 @@ import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
 import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
 import org.apache.flink.runtime.protocols.JobManagerProtocol;
 import org.apache.flink.runtime.taskmanager.AbstractTaskResult;
-import org.apache.flink.runtime.taskmanager.ExecutorThreadFactory;
 import org.apache.flink.runtime.taskmanager.TaskCancelResult;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskKillResult;
 import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
-import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
 import org.apache.flink.runtime.types.IntegerRecord;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.SerializableArrayList;
 import org.apache.flink.util.StringUtils;
 import org.apache.log4j.ConsoleAppender;
@@ -130,32 +127,32 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 
 	private static final Log LOG = LogFactory.getLog(JobManager.class);
 
+	private final static int FAILURE_RETURN_CODE = 1;
+	
+	
+	private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
+	
 	private final Server jobManagerServer;
 
-	private final JobManagerProfiler profiler;
-
 	private final EventCollector eventCollector;
 	
 	private final ArchiveListener archive;
 
-	private final InputSplitManager inputSplitManager;
-
+	private final InstanceManager instanceManager;
+	
 	private final DefaultScheduler scheduler;
 	
-	private AccumulatorManager accumulatorManager;
-
-	private InstanceManager instanceManager;
+	private final AccumulatorManager accumulatorManager;
 
+	
 	private final int recommendedClientPollingInterval;
 
-	private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
-
-	private final static int FAILURE_RETURN_CODE = 1;
-
+	
 	private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
 
 	private volatile boolean isShutDown;
 	
+	
 	private WebInfoServer server;
 	
 	
@@ -199,8 +196,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		// accumulator results.
 		this.accumulatorManager = new AccumulatorManager(Math.min(1, archived_items));
 
-		// Load the input split manager
-		this.inputSplitManager = new InputSplitManager();
 
 		// Determine own RPC address
 		final InetSocketAddress rpcServerAddress = new InetSocketAddress(ipcAddress, ipcPort);
@@ -219,7 +214,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 
 		// Try to load the instance manager for the given execution mode
 		if (executionMode == ExecutionMode.LOCAL) {
-			this.instanceManager = new Lo
+			final int numTaskManagers = GlobalConfiguration.getInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
+			this.instanceManager = new LocalInstanceManager(numTaskManagers);
 		}
 		else if (executionMode == ExecutionMode.CLUSTER) {
 			this.instanceManager = new DefaultInstanceManager();
@@ -237,19 +233,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		if (this.scheduler == null) {
 			throw new Exception("Unable to load scheduler " + schedulerClassName);
 		}
-
-		// Load profiler if it should be used
-		if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
-			final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.JOBMANAGER_CLASSNAME_KEY,
-				"org.apache.flink.runtime.profiling.impl.JobManagerProfilerImpl");
-			this.profiler = ProfilingUtils.loadJobManagerProfiler(profilerClassName, ipcAddress);
-			if (this.profiler == null) {
-				throw new Exception("Cannot load profiler");
-			}
-		} else {
-			this.profiler = null;
-			LOG.debug("Profiler disabled");
-		}
 	}
 
 	public void shutdown() {
@@ -263,11 +246,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			this.instanceManager.shutdown();
 		}
 
-		// Stop profiling if enabled
-		if (this.profiler != null) {
-			this.profiler.shutdown();
-		}
-
 		// Stop RPC server
 		if (this.jobManagerServer != null) {
 			this.jobManagerServer.stop();
@@ -458,12 +436,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 				LOG.debug("The dependency chain for instance sharing is acyclic");
 			}
 	
-			// Check if the job will be executed with profiling enabled
-			boolean jobRunsWithProfiling = false;
-			if (this.profiler != null && job.getJobConfiguration().getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
-				jobRunsWithProfiling = true;
-			}
-	
 			// Try to create initial execution graph from job graph
 			LOG.info("Creating initial execution graph from job graph " + job.getName());
 			ExecutionGraph eg;
@@ -485,22 +457,9 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	
 			// Register job with the progress collector
 			if (this.eventCollector != null) {
-				this.eventCollector.registerJob(eg, jobRunsWithProfiling, System.currentTimeMillis());
+				this.eventCollector.registerJob(eg, false, System.currentTimeMillis());
 			}
 	
-			// Check if profiling should be enabled for this job
-			if (jobRunsWithProfiling) {
-				this.profiler.registerProfilingJob(eg);
-	
-				if (this.eventCollector != null) {
-					this.profiler.registerForProfilingData(eg.getJobID(), this.eventCollector);
-				}
-	
-			}
-	
-			// Register job with the dynamic input split assigner
-			this.inputSplitManager.registerJob(eg);
-	
 			// Register for updates on the job status
 			eg.registerJobStatusListener(this);
 	
@@ -902,41 +861,12 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		eg.executeCommand(runnable);
 	}
 
-
-	@Override
-	public void killInstance(final StringRecord instanceName) throws IOException {
-
-		final Instance instance = this.instanceManager.getInstanceByName(instanceName.toString());
-		if (instance == null) {
-			LOG.error("Cannot find instance with name " + instanceName + " to kill it");
-			return;
-		}
-
-		LOG.info("Killing task manager on instance " + instance);
-
-		final Runnable runnable = new Runnable() {
-
-			@Override
-			public void run() {
-				try {
-					instance.killTaskManager();
-				} catch (IOException ioe) {
-					LOG.error(ioe);
-				}
-			}
-		};
-
-		// Hand it over to the executor service
-		this.executorService.execute(runnable);
-	}
-
 	/**
 	 * Tests whether the job manager has been shut down completely.
 	 * 
 	 * @return <code>true</code> if the job manager has been shut down completely, <code>false</code> otherwise
 	 */
 	public boolean isShutDown() {
-
 		return this.isShutDown;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
index e038d7d..3cae48b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
@@ -18,170 +18,293 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayDeque;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
  * slots.
- * <p>
- * The scheduler's bookkeeping on the available instances is lazy: It is not modified once an
- * instance is dead, but it will lazily remove the instance from its pool as soon as it tries
- * to allocate a resource on that instance and it fails with an {@link InstanceDiedException}.
  */
-public class DefaultScheduler implements InstanceListener {
+public class DefaultScheduler implements InstanceListener, SlotAvailablilityListener {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(DefaultScheduler.class);
-
 	
-	private final Object lock = new Object();
+	
+	private final Object globalLock = new Object();
+	
 	
 	/** All instances that the scheduler can deploy to */
 	private final Set<Instance> allInstances = new HashSet<Instance>();
 	
 	/** All instances that still have available resources */
-	private final Queue<Instance> instancesWithAvailableResources = new LifoSetQueue<Instance>();
-
-	
-	private final ConcurrentHashMap<ResourceId, AllocatedSlot> allocatedSlots = new ConcurrentHashMap<ResourceId, AllocatedSlot>();
-	
-//	/** A cache that remembers the last resource IDs it has seen, to co-locate future
-//	 *  deployments of tasks with the same resource ID to the same instance.
-//	 */
-//	private final Cache<ResourceId, Instance> ghostCache;
+	private final Queue<Instance> instancesWithAvailableResources = new SetQueue<Instance>();
 	
 	/** All tasks pending to be scheduled */
-	private final Queue<ScheduledUnit> taskQueue = new ArrayDeque<ScheduledUnit>();
-
+	private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
 	
-	/** The thread that runs the scheduling loop, picking up tasks to be scheduled and scheduling them. */
-	private final Thread schedulerThread;
 	
+	private int unconstrainedAssignments = 0;
 	
-	/** Atomic flag to safely control the shutdown */
-	private final AtomicBoolean shutdown = new AtomicBoolean(false);
+	private int localizedAssignments = 0;
 	
-	/** Flag indicating whether the scheduler should reject a unit if it cannot find a resource
-	 * for it at the time of scheduling */
-	private final boolean rejectIfNoResourceAvailable;
+	private int nonLocalizedAssignments = 0;
 	
-
 	
 	public DefaultScheduler() {
-		this(true);
-	}
-	
-	public DefaultScheduler(boolean rejectIfNoResourceAvailable) {
-		this.rejectIfNoResourceAvailable = rejectIfNoResourceAvailable;
-		
-//		this.ghostCache = CacheBuilder.newBuilder()
-//				.initialCapacity(64)	// easy start
-//				.maximumSize(1024)		// retain some history
-//				.weakValues()			// do not prevent dead instances from being collected
-//				.build();
-		
-		// set up (but do not start) the scheduling thread
-		Runnable loopRunner = new Runnable() {
-			@Override
-			public void run() {
-				runSchedulerLoop();
-			}
-		};
-		this.schedulerThread = new Thread(loopRunner, "Scheduling Thread");
-	}
-	
-	public void start() {
-		if (shutdown.get()) {
-			throw new IllegalStateException("Scheduler has been shut down.");
-		}
-		
-		try {
-			this.schedulerThread.start();
-		}
-		catch (IllegalThreadStateException e) {
-			throw new IllegalStateException("The scheduler has already been started.");
-		}
 	}
 	
 	/**
 	 * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
 	 */
 	public void shutdown() {
-		if (this.shutdown.compareAndSet(false, true)) {
-			// clear the task queue and add the termination signal, to let
-			// the scheduling loop know that things are done
-			this.taskQueue.clear();
-			this.taskQueue.add(TERMINATION_SIGNAL);
-			
-			// interrupt the scheduling thread, in case it was waiting for resources to
-			// show up to deploy a task
-			this.schedulerThread.interrupt();
+		synchronized (globalLock) {
+			for (Instance i : allInstances) {
+				i.removeSlotListener();
+				i.cancelAndReleaseAllSlots();
+			}
+			allInstances.clear();
+			instancesWithAvailableResources.clear();
+			taskQueue.clear();
 		}
 	}
-	
-	public void setUncaughtExceptionHandler(UncaughtExceptionHandler handler) {
-		if (this.schedulerThread.getState() != Thread.State.NEW) {
-			throw new IllegalStateException("Can only add exception handler before starting the scheduler.");
+
+	/**
+	 * 
+	 * NOTE: In the presence of multi-threaded operations, this number may be inexact.
+	 * 
+	 * @return The number of empty slots, for tasks.
+	 */
+	public int getNumberOfAvailableSlots() {
+		int count = 0;
+		
+		synchronized (globalLock) {
+			for (Instance instance : instancesWithAvailableResources) {
+				count += instance.getNumberOfAvailableSlots();
+			}
 		}
-		this.schedulerThread.setUncaughtExceptionHandler(handler);
+		
+		return count;
 	}
-
 	
 	// --------------------------------------------------------------------------------------------
 	//  Scheduling
 	// --------------------------------------------------------------------------------------------
 	
+	public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
+		Object ret = scheduleTask(task, false);
+		if (ret instanceof AllocatedSlot) {
+			return (AllocatedSlot) ret;
+		}
+		else {
+			throw new RuntimeException();
+		}
+	}
+	
+	public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
+		Object ret = scheduleTask(task, true);
+		if (ret instanceof AllocatedSlot) {
+			return new SlotAllocationFuture((AllocatedSlot) ret);
+		}
+		if (ret instanceof SlotAllocationFuture) {
+			return (SlotAllocationFuture) ret;
+		}
+		else {
+			throw new RuntimeException();
+		}
+	}
+	
 	/**
-	 * @param task
-	 * @param queueIfNoResource If true, this call will queue the request if no resource is immediately
-	 *                          available. If false, it will throw a {@link NoResourceAvailableException}
-	 *                          if no resource is immediately available.
+	 * Returns either an {@link AllocatedSlot}, or an {@link SlotAllocationFuture}.
 	 */
-	public void scheduleTask(ScheduledUnit task, boolean queueIfNoResource) {
+	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
 		if (task == null) {
 			throw new IllegalArgumentException();
 		}
 		
-		// if there is already a slot for that resource
-		AllocatedSlot existing = this.allocatedSlots.get(task.getSharedResourceId());
-		if (existing != null) {
-			// try to attach to the existing slot
-			if (existing.runTask(task.getTaskVertex())) {
-				// all good, we are done
-				return;
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Scheduling task " + task);
+		}
+	
+		synchronized (globalLock) {
+			// 1) If the task has a strict co-schedule hint, obey it, if it has been assigned.
+//			CoLocationHint hint = task.getCoScheduleHint();
+//			if (hint != null) {
+//				
+//				// try to add to the slot, or make it wait on the hint and schedule the hint itself
+//				if () {
+//					return slot;
+//				}
+//			}
+		
+			// 2) See if we can place the task somewhere together with another existing task.
+			//    This is defined by the slot sharing groups
+			SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
+			if (sharingUnit != null) {
+				// see if we can add the task to the current sharing group.
+				SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
+				AllocatedSlot slot = assignment.getSlotForTask(task.getJobVertexId(), task.getTaskVertex());
+				if (slot != null) {
+					return slot;
+				}
+			}
+		
+			// 3) We could not schedule it to an existing slot, so we need to get a new one or queue the task
+			
+			// we need potentially to loop multiple times, because there may be false positives
+			// in the set-with-available-instances
+			while (true) {
+				Instance instanceToUse = getFreeInstanceForTask(task.getTaskVertex());
+			
+				if (instanceToUse != null) {
+					try {
+						AllocatedSlot slot = instanceToUse.allocateSlot(task.getTaskVertex().getJobId());
+						
+						// if the instance has further available slots, re-add it to the set of available resources.
+						if (instanceToUse.hasResourcesAvailable()) {
+							this.instancesWithAvailableResources.add(instanceToUse);
+						}
+						
+						if (slot != null) {
+							
+							// if the task is in a shared group, assign the slot to that group
+							// and get a sub slot in turn
+							if (sharingUnit != null) {
+								slot = sharingUnit.getTaskAssignment().addSlotWithTask(slot, task.getJobVertexId());
+							}
+							
+							// try to run the task 
+							if (slot.runTask(task.getTaskVertex())) {
+								return slot;
+							} else {
+								// did not assign, so we recycle the resource
+								slot.releaseSlot();
+							}
+						}
+					}
+					catch (InstanceDiedException e) {
+						// the instance died it has not yet been propagated to this scheduler
+						// remove the instance from the set of available instances
+						this.allInstances.remove(instanceToUse);
+						this.instancesWithAvailableResources.remove(instanceToUse);
+					}
+				}
+				else {
+					// no resource available now, so queue the request
+					if (queueIfNoResource) {
+						SlotAllocationFuture future = new SlotAllocationFuture();
+						this.taskQueue.add(new QueuedTask(task, future));
+						return future;
+					}
+					else {
+						throw new NoResourceAvailableException(task);
+					}
+				}
 			}
-			// else: the slot was deallocated, we need to proceed as if there was none
 		}
+	}
 		
-		// check if there is a slot that has an available sub-slot for that group-vertex
-		// TODO
+	/**
+	 * Gets a suitable instance to schedule the vertex execution to.
+	 * <p>
+	 * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.
+	 * 
+	 * @param vertex The task to run. 
+	 * @return The instance to run the vertex on, it {@code null}, if no instance is available.
+	 */
+	protected Instance getFreeInstanceForTask(ExecutionVertex2 vertex) {
+		if (this.instancesWithAvailableResources.isEmpty()) {
+			return null;
+		}
 		
+		Iterable<Instance> locationsIterable = vertex.getPreferredLocations();
+		Iterator<Instance> locations = locationsIterable == null ? null : locationsIterable.iterator();
 		
+		if (locations != null && locations.hasNext()) {
+			
+			while (locations.hasNext()) {
+				Instance location = locations.next();
+				
+				if (location != null && this.instancesWithAvailableResources.remove(location)) {
+					
+					localizedAssignments++;
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + location);
+					}
+					
+					return location;
+				}
+			}
+			
+			Instance instance = this.instancesWithAvailableResources.poll();
+			nonLocalizedAssignments++;
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instance);
+			}
+			return instance;
+		}
+		else {
+			Instance instance = this.instancesWithAvailableResources.poll();
+			unconstrainedAssignments++;
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instance);
+			}
+			
+			return instance;
+		}
+	}
+	
+	@Override
+	public void newSlotAvailable(Instance instance) {
+		
+		// global lock before instance lock, so that the order of acquiring locks is always 1) global, 2) instance
+		synchronized (globalLock) {
+			QueuedTask queued = taskQueue.peek();
+			
+			// the slot was properly released, we can allocate a new one from that instance
+			
+			if (queued != null) {
+				ScheduledUnit task = queued.getTask();
+				
+				try {
+					AllocatedSlot newSlot = instance.allocateSlot(task.getTaskVertex().getJobId());
+					if (newSlot != null && newSlot.runTask(task.getTaskVertex())) {
+						
+						// success, remove from the task queue and notify the future
+						taskQueue.poll();
+						if (queued.getFuture() != null) {
+							queued.getFuture().setSlot(newSlot);
+						}
+					}
+				}
+				catch (InstanceDiedException e) {
+					this.allInstances.remove(instance);
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Instance " + instance + " was marked dead asynchronously.");
+					}
+				}
+			}
+			else {
+				this.instancesWithAvailableResources.add(instance);
+			}
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Instance Availability
 	// --------------------------------------------------------------------------------------------
 	
-	
 	@Override
 	public void newInstanceAvailable(Instance instance) {
 		if (instance == null) {
@@ -195,15 +318,28 @@ public class DefaultScheduler implements InstanceListener {
 		}
 		
 		// synchronize globally for instance changes
-		synchronized (this.lock) {
+		synchronized (this.globalLock) {
+			
 			// check we do not already use this instance
 			if (!this.allInstances.add(instance)) {
 				throw new IllegalArgumentException("The instance is already contained.");
 			}
 			
+			try {
+				instance.setSlotAvailabilityListener(this);
+			}
+			catch (IllegalStateException e) {
+				this.allInstances.remove(instance);
+				LOG.error("Scheduler could not attach to the instance as a listener.");
+			}
+			
+			
 			// add it to the available resources and let potential waiters know
 			this.instancesWithAvailableResources.add(instance);
-			this.lock.notifyAll();
+			
+			for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
+				newSlotAvailable(instance);
+			}
 		}
 	}
 	
@@ -216,173 +352,57 @@ public class DefaultScheduler implements InstanceListener {
 		instance.markDead();
 		
 		// we only remove the instance from the pools, we do not care about the 
-		synchronized (this.lock) {
+		synchronized (this.globalLock) {
 			// the instance must not be available anywhere any more
 			this.allInstances.remove(instance);
 			this.instancesWithAvailableResources.remove(instance);
 		}
 	}
 	
+	// --------------------------------------------------------------------------------------------
+	//  Status reporting
+	// --------------------------------------------------------------------------------------------
+
 	public int getNumberOfAvailableInstances() {
-		synchronized (lock) {
-			return allInstances.size();
-		}
+		return allInstances.size();
 	}
 	
-	// --------------------------------------------------------------------------------------------
-	//  Scheduling
-	// --------------------------------------------------------------------------------------------
+	public int getNumberOfInstancesWithAvailableSlots() {
+		return instancesWithAvailableResources.size();
+	}
 	
-//	/**
-//	 * Schedules the given unit to an available resource. This call blocks if no resource
-//	 * is currently available
-//	 * 
-//	 * @param unit The unit to be scheduled.
-//	 */
-//	protected void scheduleQueuedUnit(ScheduledUnit unit) {
-//		if (unit == null) {
-//			throw new IllegalArgumentException("Unit to schedule must not be null.");
-//		}
-//		
-//		// see if the resource Id has already an assigned resource
-//		AllocatedSlot resource = this.allocatedSlots.get(unit.getSharedResourceId());
-//		
-//		if (resource == null) {
-//			// not yet allocated. find a slot to schedule to
-//			try {
-//				resource = getResourceToScheduleUnit(unit, this.rejectIfNoResourceAvailable);
-//				if (resource == null) {
-//					throw new RuntimeException("Error: The resource to schedule to is null.");
-//				}
-//			}
-//			catch (Exception e) {
-//				// we cannot go on, the task needs to know what to do now.
-//				unit.getTaskVertex().handleException(e);
-//				return;
-//			}
-//		}
-//		
-//		resource.runTask(unit.getTaskVertex());
-//	}
+	public int getNumberOfUnconstrainedAssignments() {
+		return unconstrainedAssignments;
+	}
 	
-	/**
-	 * Acquires a resource to schedule the given unit to. This call may block if no
-	 * resource is currently available, or throw an exception, based on the given flag.
-	 * 
-	 * @param unit The unit to find a resource for.
-	 * @return The resource to schedule the execution of the given unit on.
-	 * 
-	 * @throws NoResourceAvailableException If the {@code exceptionOnNoAvailability} flag is true and the scheduler
-	 *                                      has currently no resources available.
-	 */
-	protected AllocatedSlot getNewSlotForTask(ScheduledUnit unit, boolean queueIfNoResource) 
-		throws NoResourceAvailableException
-	{
-		synchronized (this.lock) {
-			Instance instanceToUse = this.instancesWithAvailableResources.poll();
-			
-			// if there is nothing, throw an exception or wait, depending on what is configured
-			if (instanceToUse != null) {
-				try {
-					AllocatedSlot slot = instanceToUse.allocateSlot(unit.getJobId(), unit.getSharedResourceId());
-					
-					// if the instance has further available slots, re-add it to the set of available resources.
-					if (instanceToUse.hasResourcesAvailable()) {
-						this.instancesWithAvailableResources.add(instanceToUse);
-					}
-					
-					if (slot != null) {
-						AllocatedSlot previous = this.allocatedSlots.putIfAbsent(unit.getSharedResourceId(), slot);
-						if (previous != null) {
-							// concurrently, someone allocated a slot for that ID
-							// release the new one
-							slot.cancelResource();
-							slot = previous;
-						}
-					}
-					// else fall through the loop
-				}
-				catch (InstanceDiedException e) {
-					// the instance died it has not yet been propagated to this scheduler
-					// remove the instance from the set of available instances
-					this.allInstances.remove(instanceToUse);
-				}
-			}
-				
-			
-			if (queueIfNoResource) {
-				this.taskQueue.add(unit);
-			}
-			else {
-				throw new NoResourceAvailableException(unit);
-			}
-				// at this point, we have an instance. request a slot from the instance
-				
-				
-				// if the instance has further available slots, re-add it to the set of available
-				// resources.
-				// if it does not, but asynchronously a slot became available, we may attempt to add the
-				// instance twice, which does not matter because of the set semantics of the "instancesWithAvailableResources"
-				if (instanceToUse.hasResourcesAvailable()) {
-					this.instancesWithAvailableResources.add(instanceToUse);
-				}
-				
-				if (slot != null) {
-					AllocatedSlot previous = this.allocatedSlots.putIfAbsent(unit.getSharedResourceId(), slot);
-					if (previous != null) {
-						// concurrently, someone allocated a slot for that ID
-						// release the new one
-						slot.cancelResource();
-						slot = previous;
-					}
-				}
-				// else fall through the loop
-			}
-		}
-		
-		return slot;
+	public int getNumberOfLocalizedAssignments() {
+		return localizedAssignments;
 	}
 	
-	protected void runSchedulerLoop() {
-		// while the scheduler is alive
-		while (!shutdown.get()) {
-			
-			// get the next unit
-			ScheduledUnit next = null;
-			try {
-				next = this.taskQueue.take();
-			}
-			catch (InterruptedException e) {
-				if (shutdown.get()) {
-					return;
-				} else {
-					LOG.error("Scheduling loop was interrupted.");
-				}
-			}
-			
-			// if we see this special unit, it means we are done
-			if (next == TERMINATION_SIGNAL) {
-				return;
-			}
-			
-			// deploy the next scheduling unit
-			try {
-				scheduleNextUnit(next);
-			}
-			catch (Throwable t) {
-				// ignore the errors in the presence of a shutdown
-				if (!shutdown.get()) {
-					if (t instanceof Error) {
-						throw (Error) t;
-					} else if (t instanceof RuntimeException) {
-						throw (RuntimeException) t;
-					} else {
-						throw new RuntimeException("Critical error in scheduler thread.", t);
-					}
-				}
-			}
-		}
+	public int getNumberOfNonLocalizedAssignments() {
+		return nonLocalizedAssignments;
 	}
 	
-	private static final ScheduledUnit TERMINATION_SIGNAL = new ScheduledUnit();
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class QueuedTask {
+		
+		private final ScheduledUnit task;
+		
+		private final SlotAllocationFuture future;
+		
+		
+		public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) {
+			this.task = task;
+			this.future = future;
+		}
+
+		public ScheduledUnit getTask() {
+			return task;
+		}
+
+		public SlotAllocationFuture getFuture() {
+			return future;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java
deleted file mode 100644
index 47aadf9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.util.Comparator;
-
-import org.apache.flink.runtime.instance.Instance;
-
-public class InstanceFillDegreeComparator implements Comparator<Instance> {
-
-	@Override
-	public int compare(Instance o1, Instance o2) {
-		float fraction1 = o1.getNumberOfAvailableSlots() / (float) o1.getTotalNumberOfSlots();
-		float fraction2 = o2.getNumberOfAvailableSlots() / (float) o2.getTotalNumberOfSlots();
-		
-		return fraction1 < fraction2 ? -1 : 1;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
index 338529f..2b0de6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -1,71 +1,75 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 public class ScheduledUnit {
 	
-	private final JobID jobId;
-	
 	private final ExecutionVertex2 taskVertex;
 	
-	private final ResourceId resourceId;
-	
-	private final AtomicBoolean scheduled = new AtomicBoolean(false);
+	private final SlotSharingGroup sharingGroup;
 	
+	// --------------------------------------------------------------------------------------------
 	
-	public ScheduledUnit(JobID jobId, ExecutionVertex2 taskVertex) {
-		this(jobId, taskVertex, new ResourceId());
+	public ScheduledUnit(ExecutionVertex2 taskVertex) {
+		if (taskVertex == null) {
+			throw new NullPointerException();
+		}
+		
+		this.taskVertex = taskVertex;
+		this.sharingGroup = null;
 	}
 	
-	public ScheduledUnit(JobID jobId, ExecutionVertex2 taskVertex, ResourceId resourceId) {
-		if (jobId == null || taskVertex == null || resourceId == null) {
+	public ScheduledUnit(ExecutionVertex2 taskVertex, SlotSharingGroup sharingUnit) {
+		if (taskVertex == null) {
 			throw new NullPointerException();
 		}
 		
-		this.jobId = jobId;
 		this.taskVertex = taskVertex;
-		this.resourceId = resourceId;
+		this.sharingGroup = sharingUnit;
 	}
 	
 	ScheduledUnit() {
-		this.jobId = null;
 		this.taskVertex = null;
-		this.resourceId = null;
+		this.sharingGroup = null;
 	}
 
+	// --------------------------------------------------------------------------------------------
 	
-	public JobID getJobId() {
-		return jobId;
+	public JobVertexID getJobVertexId() {
+		return this.taskVertex.getJobvertexId();
 	}
 	
 	public ExecutionVertex2 getTaskVertex() {
 		return taskVertex;
 	}
 	
-	public ResourceId getSharedResourceId() {
-		return resourceId;
+	public SlotSharingGroup getSlotSharingGroup() {
+		return sharingGroup;
 	}
+
+	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public String toString() {
-		return "(job=" + jobId + ", resourceId=" + resourceId + ", vertex=" + taskVertex + ')';
+		return "{vertex=" + taskVertex.getSimpleName() + ", sharingUnit=" + sharingGroup + '}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java
deleted file mode 100644
index 3bca46b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-/**
- * The scheduling strategy describes how scheduler distributes tasks across resources.
- */
-public enum SchedulingStrategy {
-	
-	/**
-	 * This strategy tries to keep all machines utilized roughly the same.
-	 */
-	SPREAD_OUT_TASKS,
-	
-	/**
-	 * This strategy will put as many tasks on one each machine as possible, before putting
-	 * tasks on the next machine.
-	 */
-	CLUSTER_TASKS
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueue.java
new file mode 100644
index 0000000..56b1146
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueue.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Queue;
+
+/**
+ * A queue that maintains set characteristics, i.e., elements
+ * that are already in the queue may not be added another time.
+ *
+ * @param <E> The type of the elements in the queue.
+ */
+public class SetQueue<E> extends AbstractQueue<E> implements Queue<E> {
+
+	private final LinkedHashSet<E> set = new LinkedHashSet<E>();
+	
+	@Override
+	public boolean offer(E e) {
+		if (e == null) {
+			throw new NullPointerException();
+		}
+		
+		// may, or may not, add the element.
+		set.add(e);
+		
+		// we always return true, because the queue did not reject the element
+		// due to capacity constraints
+		return true;
+	}
+
+	@Override
+	public E poll() {
+		Iterator<E> iter = set.iterator();
+		if (iter.hasNext()) {
+			E next = iter.next();
+			iter.remove();
+			return next;
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public E peek() {
+		Iterator<E> iter = set.iterator();
+		if (iter.hasNext()) {
+			return iter.next();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public Iterator<E> iterator() {
+		return set.iterator();
+	}
+
+	@Override
+	public int size() {
+		return set.size();
+	}
+	
+	@Override
+	public void clear() {
+		set.clear();
+	}
+	
+	@Override
+	public boolean remove(Object o) {
+		return set.remove(o);
+	}
+	
+	@Override
+	public boolean contains(Object o) {
+		return set.contains(o);
+	}
+	
+	@Override
+	public boolean removeAll(Collection<?> c) {
+		return set.removeAll(c);
+	}
+	
+	@Override
+	public boolean containsAll(Collection<?> c) {
+		return set.containsAll(c);
+	}
+	
+	@Override
+	public boolean retainAll(Collection<?> c) {
+		return set.retainAll(c);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return set.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj.getClass() == SetQueue.class) {
+			return set.equals(((SetQueue<?>) obj).set);
+		}
+		else {
+			return false;
+		}
+	}
+	
+	@Override
+	public String toString() {
+		return set.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
new file mode 100644
index 0000000..36d8a8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+public class SharedSlot {
+
+	private final AllocatedSlot allocatedSlot;
+	
+	private final SlotSharingGroupAssignment assignmentGroup;
+	
+	private final Set<SubSlot> subSlots;
+	
+	private int subSlotNumber;
+	
+	private volatile boolean disposed;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public SharedSlot(AllocatedSlot allocatedSlot, SlotSharingGroupAssignment assignmentGroup) {
+		if (allocatedSlot == null || assignmentGroup == null) {
+			throw new NullPointerException();
+		}
+		
+		this.allocatedSlot = allocatedSlot;
+		this.assignmentGroup = assignmentGroup;
+		this.subSlots = new HashSet<SubSlot>();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public AllocatedSlot getAllocatedSlot() {
+		return this.allocatedSlot;
+	}
+	
+	public boolean isDisposed() {
+		return disposed;
+	}
+	
+	public int getNumberOfAllocatedSubSlots() {
+		synchronized (this.subSlots) {
+			return this.subSlots.size();
+		}
+	}
+	
+	public SubSlot allocateSubSlot(JobVertexID jid) {
+		synchronized (this.subSlots) {
+			if (isDisposed()) {
+				return null;
+			} else {
+				SubSlot ss = new SubSlot(this, subSlotNumber++, jid);
+				this.subSlots.add(ss);
+				return ss;
+			}
+		}
+	}
+	
+	void returnAllocatedSlot(SubSlot slot) {
+		boolean release;
+		
+		synchronized (this.subSlots) {
+			if (!this.subSlots.remove(slot)) {
+				throw new IllegalArgumentException("Wrong shared slot for subslot.");
+			}
+			
+			release = assignmentGroup.sharedSlotAvailableForJid(this, slot.getJobVertexId(), this.subSlots.isEmpty());
+			
+			if (release) {
+				disposed = true;
+			}
+		}
+		
+		// do this call outside the lock, because releasing the allocated slot may go into further scheduler calls
+		if (release) {
+			this.allocatedSlot.releaseSlot();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
new file mode 100644
index 0000000..679cccc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+
+public class SlotAllocationFuture {
+	
+	private final Object monitor = new Object();
+	
+	private volatile AllocatedSlot slot;
+	
+	private volatile SlotAllocationFutureAction action;
+	
+	// --------------------------------------------------------------------------------------------
+
+	public SlotAllocationFuture() {}
+	
+	public SlotAllocationFuture(AllocatedSlot slot) {
+		this.slot = slot;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public AllocatedSlot waitTillAllocated() throws InterruptedException {
+		return waitTillAllocated(0);
+	}
+	
+	public AllocatedSlot waitTillAllocated(long timeout) throws InterruptedException {
+		synchronized (monitor) {
+			while (slot == null) {
+				monitor.wait(timeout);
+			}
+			
+			return slot;
+		}
+	}
+	
+	public void setFutureAction(SlotAllocationFutureAction action) {
+		synchronized (monitor) {
+			if (this.action != null) {
+				throw new IllegalStateException("Future already has an action registered.");
+			}
+			
+			this.action = action;
+			
+			if (this.slot != null) {
+				action.slotAllocated(this.slot);
+			}
+		}
+	}
+	
+	public void setSlot(AllocatedSlot slot) {
+		if (slot == null) {
+			throw new NullPointerException();
+		}
+		
+		synchronized (monitor) {
+			if (this.slot != null) {
+				throw new IllegalStateException("The future has already been assigned a slot.");
+			}
+			
+			this.slot = slot;
+			monitor.notifyAll();
+			
+			if (action != null) {
+				action.slotAllocated(slot);
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return slot == null ? "PENDING" : "DONE";
+	}
+}