You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:47 UTC

[28/52] [abbrv] flink git commit: [FLINK-4987] Harden SlotPool on JobMaster

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java
new file mode 100644
index 0000000..d33bba4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/Clock.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.clock;
+
+/**
+ * A clock that gives access to time. This clock returns two flavors of time:
+ * 
+ * <p><b>Absolute Time:</b> This refers to real world wall clock time, and it typically
+ * derived from a system clock. It is subject to clock drift and inaccuracy, and can jump
+ * if the system clock is adjusted.
+ * 
+ * <p><b>Relative Time:</b> This time advances at the same speed as the <i>absolute time</i>,
+ * but the timestamps can only be referred to relative to each other. The timestamps have
+ * no absolute meaning and cannot be compared across JVM processes. The source for the
+ * timestamps is not affected by adjustments to the system clock, so it never jumps.
+ */
+public abstract class Clock {
+
+	public abstract long absoluteTimeMillis();
+
+	public abstract long relativeTimeMillis();
+
+	public abstract long relativeTimeNanos();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java
new file mode 100644
index 0000000..789a0b7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.clock;
+
+/**
+ * A clock that returns the time of the system / process.
+ * 
+ * <p>This clock uses {@link System#currentTimeMillis()} for <i>absolute time</i>
+ * and {@link System#nanoTime()} for <i>relative time</i>.
+ * 
+ * <p>This SystemClock exists as a singleton instance.
+ */
+public class SystemClock extends Clock {
+
+	private static final SystemClock INSTANCE = new SystemClock();
+
+	public static SystemClock getInstance() {
+		return INSTANCE;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public long absoluteTimeMillis() {
+		return System.currentTimeMillis();
+	}
+
+	@Override
+	public long relativeTimeMillis() {
+		return System.nanoTime() / 1_000_000;
+	}
+
+	@Override
+	public long relativeTimeNanos() {
+		return System.nanoTime();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private SystemClock() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index bc5ddaa..cd1d895 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -46,4 +46,9 @@ public class ResourceProfileTest {
 		assertTrue(rp4.isMatching(rp3));
 		assertTrue(rp4.isMatching(rp4));
 	}
+
+	@Test
+	public void testUnknownMatchesUnknown() {
+		assertTrue(ResourceProfile.UNKNOWN.isMatching(ResourceProfile.UNKNOWN));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
index 655a3ea..33ed679 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -1,135 +1,135 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AllocatedSlotsTest {
-
-	@Test
-	public void testOperations() throws Exception {
-		SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
-
-		final AllocationID allocation1 = new AllocationID();
-		final ResourceID resource1 = new ResourceID("resource1");
-		final Slot slot1 = createSlot(resource1);
-
-		allocatedSlots.add(allocation1, new SlotDescriptor(slot1), slot1);
-
-		assertTrue(allocatedSlots.contains(slot1));
-		assertTrue(allocatedSlots.containResource(resource1));
-
-		assertEquals(slot1, allocatedSlots.get(allocation1));
-		assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
-		assertEquals(1, allocatedSlots.size());
-
-		final AllocationID allocation2 = new AllocationID();
-		final Slot slot2 = createSlot(resource1);
-
-		allocatedSlots.add(allocation2, new SlotDescriptor(slot2), slot2);
-
-		assertTrue(allocatedSlots.contains(slot1));
-		assertTrue(allocatedSlots.contains(slot2));
-		assertTrue(allocatedSlots.containResource(resource1));
-
-		assertEquals(slot1, allocatedSlots.get(allocation1));
-		assertEquals(slot2, allocatedSlots.get(allocation2));
-		assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
-		assertEquals(2, allocatedSlots.size());
-
-		final AllocationID allocation3 = new AllocationID();
-		final ResourceID resource2 = new ResourceID("resource2");
-		final Slot slot3 = createSlot(resource2);
-
-		allocatedSlots.add(allocation3, new SlotDescriptor(slot2), slot3);
-
-		assertTrue(allocatedSlots.contains(slot1));
-		assertTrue(allocatedSlots.contains(slot2));
-		assertTrue(allocatedSlots.contains(slot3));
-		assertTrue(allocatedSlots.containResource(resource1));
-		assertTrue(allocatedSlots.containResource(resource2));
-
-		assertEquals(slot1, allocatedSlots.get(allocation1));
-		assertEquals(slot2, allocatedSlots.get(allocation2));
-		assertEquals(slot3, allocatedSlots.get(allocation3));
-		assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
-		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
-		assertEquals(3, allocatedSlots.size());
-
-		allocatedSlots.remove(slot2);
-
-		assertTrue(allocatedSlots.contains(slot1));
-		assertFalse(allocatedSlots.contains(slot2));
-		assertTrue(allocatedSlots.contains(slot3));
-		assertTrue(allocatedSlots.containResource(resource1));
-		assertTrue(allocatedSlots.containResource(resource2));
-
-		assertEquals(slot1, allocatedSlots.get(allocation1));
-		assertNull(allocatedSlots.get(allocation2));
-		assertEquals(slot3, allocatedSlots.get(allocation3));
-		assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
-		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
-		assertEquals(2, allocatedSlots.size());
-
-		allocatedSlots.remove(slot1);
-
-		assertFalse(allocatedSlots.contains(slot1));
-		assertFalse(allocatedSlots.contains(slot2));
-		assertTrue(allocatedSlots.contains(slot3));
-		assertFalse(allocatedSlots.containResource(resource1));
-		assertTrue(allocatedSlots.containResource(resource2));
-
-		assertNull(allocatedSlots.get(allocation1));
-		assertNull(allocatedSlots.get(allocation2));
-		assertEquals(slot3, allocatedSlots.get(allocation3));
-		assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
-		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
-		assertEquals(1, allocatedSlots.size());
-
-		allocatedSlots.remove(slot3);
-
-		assertFalse(allocatedSlots.contains(slot1));
-		assertFalse(allocatedSlots.contains(slot2));
-		assertFalse(allocatedSlots.contains(slot3));
-		assertFalse(allocatedSlots.containResource(resource1));
-		assertFalse(allocatedSlots.containResource(resource2));
-
-		assertNull(allocatedSlots.get(allocation1));
-		assertNull(allocatedSlots.get(allocation2));
-		assertNull(allocatedSlots.get(allocation3));
-		assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
-		assertEquals(0, allocatedSlots.getSlotsByResource(resource2).size());
-		assertEquals(0, allocatedSlots.size());
-	}
-
-	private Slot createSlot(final ResourceID resourceId) {
-		Slot slot = mock(Slot.class);
-		when(slot.getTaskManagerID()).thenReturn(resourceId);
-		return slot;
-	}
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.runtime.instance;
+//
+//import org.apache.flink.runtime.clusterframework.types.AllocationID;
+//import org.apache.flink.runtime.clusterframework.types.ResourceID;
+//import org.junit.Test;
+//
+//import static org.junit.Assert.assertEquals;
+//import static org.junit.Assert.assertFalse;
+//import static org.junit.Assert.assertNull;
+//import static org.junit.Assert.assertTrue;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.when;
+//
+//public class AllocatedSlotsTest {
+//
+//	@Test
+//	public void testOperations() throws Exception {
+//		SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
+//
+//		final AllocationID allocation1 = new AllocationID();
+//		final ResourceID resource1 = new ResourceID("resource1");
+//		final Slot slot1 = createSlot(resource1);
+//
+//		allocatedSlots.add(allocation1, new SlotDescriptor(slot1), slot1);
+//
+//		assertTrue(allocatedSlots.contains(slot1));
+//		assertTrue(allocatedSlots.containResource(resource1));
+//
+//		assertEquals(slot1, allocatedSlots.get(allocation1));
+//		assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
+//		assertEquals(1, allocatedSlots.size());
+//
+//		final AllocationID allocation2 = new AllocationID();
+//		final Slot slot2 = createSlot(resource1);
+//
+//		allocatedSlots.add(allocation2, new SlotDescriptor(slot2), slot2);
+//
+//		assertTrue(allocatedSlots.contains(slot1));
+//		assertTrue(allocatedSlots.contains(slot2));
+//		assertTrue(allocatedSlots.containResource(resource1));
+//
+//		assertEquals(slot1, allocatedSlots.get(allocation1));
+//		assertEquals(slot2, allocatedSlots.get(allocation2));
+//		assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
+//		assertEquals(2, allocatedSlots.size());
+//
+//		final AllocationID allocation3 = new AllocationID();
+//		final ResourceID resource2 = new ResourceID("resource2");
+//		final Slot slot3 = createSlot(resource2);
+//
+//		allocatedSlots.add(allocation3, new SlotDescriptor(slot2), slot3);
+//
+//		assertTrue(allocatedSlots.contains(slot1));
+//		assertTrue(allocatedSlots.contains(slot2));
+//		assertTrue(allocatedSlots.contains(slot3));
+//		assertTrue(allocatedSlots.containResource(resource1));
+//		assertTrue(allocatedSlots.containResource(resource2));
+//
+//		assertEquals(slot1, allocatedSlots.get(allocation1));
+//		assertEquals(slot2, allocatedSlots.get(allocation2));
+//		assertEquals(slot3, allocatedSlots.get(allocation3));
+//		assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
+//		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
+//		assertEquals(3, allocatedSlots.size());
+//
+//		allocatedSlots.remove(slot2);
+//
+//		assertTrue(allocatedSlots.contains(slot1));
+//		assertFalse(allocatedSlots.contains(slot2));
+//		assertTrue(allocatedSlots.contains(slot3));
+//		assertTrue(allocatedSlots.containResource(resource1));
+//		assertTrue(allocatedSlots.containResource(resource2));
+//
+//		assertEquals(slot1, allocatedSlots.get(allocation1));
+//		assertNull(allocatedSlots.get(allocation2));
+//		assertEquals(slot3, allocatedSlots.get(allocation3));
+//		assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
+//		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
+//		assertEquals(2, allocatedSlots.size());
+//
+//		allocatedSlots.remove(slot1);
+//
+//		assertFalse(allocatedSlots.contains(slot1));
+//		assertFalse(allocatedSlots.contains(slot2));
+//		assertTrue(allocatedSlots.contains(slot3));
+//		assertFalse(allocatedSlots.containResource(resource1));
+//		assertTrue(allocatedSlots.containResource(resource2));
+//
+//		assertNull(allocatedSlots.get(allocation1));
+//		assertNull(allocatedSlots.get(allocation2));
+//		assertEquals(slot3, allocatedSlots.get(allocation3));
+//		assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
+//		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
+//		assertEquals(1, allocatedSlots.size());
+//
+//		allocatedSlots.remove(slot3);
+//
+//		assertFalse(allocatedSlots.contains(slot1));
+//		assertFalse(allocatedSlots.contains(slot2));
+//		assertFalse(allocatedSlots.contains(slot3));
+//		assertFalse(allocatedSlots.containResource(resource1));
+//		assertFalse(allocatedSlots.containResource(resource2));
+//
+//		assertNull(allocatedSlots.get(allocation1));
+//		assertNull(allocatedSlots.get(allocation2));
+//		assertNull(allocatedSlots.get(allocation3));
+//		assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
+//		assertEquals(0, allocatedSlots.getSlotsByResource(resource2).size());
+//		assertEquals(0, allocatedSlots.size());
+//	}
+//
+//	private Slot createSlot(final ResourceID resourceId) {
+//		Slot slot = mock(Slot.class);
+//		when(slot.getTaskManagerID()).thenReturn(resourceId);
+//		return slot;
+//	}
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
index 8e31085..4d58a31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -1,124 +1,123 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AvailableSlotsTest {
-
-	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
-
-	static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024);
-
-	@Test
-	public void testAddAndRemove() throws Exception {
-		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
-
-		final ResourceID resource1 = new ResourceID("resource1");
-		final ResourceID resource2 = new ResourceID("resource2");
-
-		final SlotDescriptor slot1 = createSlotDescriptor(resource1);
-		final SlotDescriptor slot2 = createSlotDescriptor(resource1);
-		final SlotDescriptor slot3 = createSlotDescriptor(resource2);
-
-		availableSlots.add(slot1);
-		availableSlots.add(slot2);
-		availableSlots.add(slot3);
-
-		assertEquals(3, availableSlots.size());
-		assertTrue(availableSlots.contains(slot1));
-		assertTrue(availableSlots.contains(slot2));
-		assertTrue(availableSlots.contains(slot3));
-		assertTrue(availableSlots.containResource(resource1));
-		assertTrue(availableSlots.containResource(resource2));
-
-		availableSlots.removeByResource(resource1);
-
-		assertEquals(1, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1));
-		assertFalse(availableSlots.contains(slot2));
-		assertTrue(availableSlots.contains(slot3));
-		assertFalse(availableSlots.containResource(resource1));
-		assertTrue(availableSlots.containResource(resource2));
-
-		availableSlots.removeByResource(resource2);
-
-		assertEquals(0, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1));
-		assertFalse(availableSlots.contains(slot2));
-		assertFalse(availableSlots.contains(slot3));
-		assertFalse(availableSlots.containResource(resource1));
-		assertFalse(availableSlots.containResource(resource2));
-	}
-
-	@Test
-	public void testPollFreeSlot() {
-		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
-
-		final ResourceID resource1 = new ResourceID("resource1");
-		final SlotDescriptor slot1 = createSlotDescriptor(resource1);
-
-		availableSlots.add(slot1);
-
-		assertEquals(1, availableSlots.size());
-		assertTrue(availableSlots.contains(slot1));
-		assertTrue(availableSlots.containResource(resource1));
-
-		assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE));
-
-		assertEquals(slot1, availableSlots.poll(DEFAULT_TESTING_PROFILE));
-		assertEquals(0, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1));
-		assertFalse(availableSlots.containResource(resource1));
-	}
-
-	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) {
-		return createSlotDescriptor(resourceID, new JobID());
-	}
-
-	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID) {
-		return createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-	}
-
-	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
-		final ResourceProfile resourceProfile)
-	{
-		return createSlotDescriptor(resourceID, jobID, resourceProfile, 0);
-	}
-
-	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
-		final ResourceProfile resourceProfile, final int slotNumber)
-	{
-		TaskManagerLocation location = mock(TaskManagerLocation.class);
-		when(location.getResourceID()).thenReturn(resourceID);
-		return new SlotDescriptor(jobID, location, slotNumber, resourceProfile, mock(TaskManagerGateway.class));
-	}
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.runtime.instance;
+//
+//import org.apache.flink.api.common.JobID;
+//import org.apache.flink.runtime.clusterframework.types.ResourceID;
+//import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+//import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+//import org.junit.Test;
+//
+//import static org.junit.Assert.assertEquals;
+//import static org.junit.Assert.assertFalse;
+//import static org.junit.Assert.assertNull;
+//import static org.junit.Assert.assertTrue;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.when;
+//
+//public class AvailableSlotsTest {
+//
+//	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
+//
+//	static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024);
+//
+//	@Test
+//	public void testAddAndRemove() throws Exception {
+//		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+//
+//		final ResourceID resource1 = new ResourceID("resource1");
+//		final ResourceID resource2 = new ResourceID("resource2");
+//
+//		final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+//		final SlotDescriptor slot2 = createSlotDescriptor(resource1);
+//		final SlotDescriptor slot3 = createSlotDescriptor(resource2);
+//
+//		availableSlots.add(slot1);
+//		availableSlots.add(slot2);
+//		availableSlots.add(slot3);
+//
+//		assertEquals(3, availableSlots.size());
+//		assertTrue(availableSlots.contains(slot1));
+//		assertTrue(availableSlots.contains(slot2));
+//		assertTrue(availableSlots.contains(slot3));
+//		assertTrue(availableSlots.containResource(resource1));
+//		assertTrue(availableSlots.containResource(resource2));
+//
+//		availableSlots.removeByResource(resource1);
+//
+//		assertEquals(1, availableSlots.size());
+//		assertFalse(availableSlots.contains(slot1));
+//		assertFalse(availableSlots.contains(slot2));
+//		assertTrue(availableSlots.contains(slot3));
+//		assertFalse(availableSlots.containResource(resource1));
+//		assertTrue(availableSlots.containResource(resource2));
+//
+//		availableSlots.removeByResource(resource2);
+//
+//		assertEquals(0, availableSlots.size());
+//		assertFalse(availableSlots.contains(slot1));
+//		assertFalse(availableSlots.contains(slot2));
+//		assertFalse(availableSlots.contains(slot3));
+//		assertFalse(availableSlots.containResource(resource1));
+//		assertFalse(availableSlots.containResource(resource2));
+//	}
+//
+//	@Test
+//	public void testPollFreeSlot() {
+//		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+//
+//		final ResourceID resource1 = new ResourceID("resource1");
+//		final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+//
+//		availableSlots.add(slot1);
+//
+//		assertEquals(1, availableSlots.size());
+//		assertTrue(availableSlots.contains(slot1));
+//		assertTrue(availableSlots.containResource(resource1));
+//
+//		assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE));
+//
+//		assertEquals(slot1, availableSlots.poll(DEFAULT_TESTING_PROFILE));
+//		assertEquals(0, availableSlots.size());
+//		assertFalse(availableSlots.contains(slot1));
+//		assertFalse(availableSlots.containResource(resource1));
+//	}
+//
+//	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) {
+//		return createSlotDescriptor(resourceID, new JobID());
+//	}
+//
+//	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID) {
+//		return createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//	}
+//
+//	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
+//		final ResourceProfile resourceProfile)
+//	{
+//		return createSlotDescriptor(resourceID, jobID, resourceProfile, 0);
+//	}
+//
+//	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
+//		final ResourceProfile resourceProfile, final int slotNumber)
+//	{
+//		TaskManagerLocation location = mock(TaskManagerLocation.class);
+//		when(location.getResourceID()).thenReturn(resourceID);
+//		return new SlotDescriptor(jobID, location, slotNumber, resourceProfile, mock(ActorGateway.class));
+//	}
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 30cdbd6..cc1d194 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -1,297 +1,299 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
-import static org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class SlotPoolTest extends TestLogger {
-
-	private Executor executor;
-
-	private SlotPool slotPool;
-
-	private ResourceManagerGateway resourceManagerGateway;
-
-	@Before
-	public void setUp() throws Exception {
-		this.executor = Executors.newFixedThreadPool(1);
-		this.slotPool = new SlotPool(executor);
-		this.resourceManagerGateway = mock(ResourceManagerGateway.class);
-		when(resourceManagerGateway
-			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
-			.thenReturn(mock(Future.class));
-		slotPool.setResourceManager(UUID.randomUUID(), resourceManagerGateway);
-		slotPool.setJobManagerLeaderId(UUID.randomUUID());
-	}
-
-	@After
-	public void tearDown() throws Exception {
-	}
-
-	@Test
-	public void testAllocateSimpleSlot() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerResource(resourceID);
-
-		JobID jobID = new JobID();
-		AllocationID allocationID = new AllocationID();
-		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
-		assertFalse(future.isDone());
-		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
-
-		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-
-		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-		assertTrue(future.isDone());
-		assertTrue(slot.isAlive());
-		assertEquals(resourceID, slot.getTaskManagerID());
-		assertEquals(jobID, slot.getJobID());
-		assertEquals(slotPool, slot.getOwner());
-	}
-
-	@Test
-	public void testAllocateSharedSlot() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerResource(resourceID);
-
-		JobVertexID vid = new JobVertexID();
-		SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
-		SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-
-		JobID jobID = new JobID();
-		AllocationID allocationID = new AllocationID();
-		Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, DEFAULT_TESTING_PROFILE, assignment, allocationID);
-
-		assertFalse(future.isDone());
-		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
-
-		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-
-		SharedSlot slot = future.get(1, TimeUnit.SECONDS);
-		assertTrue(future.isDone());
-		assertTrue(slot.isAlive());
-		assertEquals(resourceID, slot.getTaskManagerID());
-		assertEquals(jobID, slot.getJobID());
-		assertEquals(slotPool, slot.getOwner());
-
-		SimpleSlot simpleSlot = slot.allocateSubSlot(vid);
-		assertNotNull(simpleSlot);
-		assertTrue(simpleSlot.isAlive());
-	}
-
-	@Test
-	public void testAllocateSlotWithoutResourceManager() throws Exception {
-		slotPool.disconnectResourceManager();
-		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new JobID(), DEFAULT_TESTING_PROFILE);
-		future.handleAsync(
-			new BiFunction<SimpleSlot, Throwable, Void>() {
-				@Override
-				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
-					assertNull(simpleSlot);
-					assertNotNull(throwable);
-					return null;
-				}
-			},
-			executor);
-		try {
-			future.get(1, TimeUnit.SECONDS);
-			fail("We expected a ExecutionException.");
-		} catch (ExecutionException ex) {
-			// we expect the exception
-		}
-	}
-
-	@Test
-	public void testAllocationFulfilledByReturnedSlot() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerResource(resourceID);
-
-		JobID jobID = new JobID();
-
-		AllocationID allocationID1 = new AllocationID();
-		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
-
-		AllocationID allocationID2 = new AllocationID();
-		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
-
-		assertFalse(future1.isDone());
-		assertFalse(future2.isDone());
-		verify(resourceManagerGateway, times(2))
-			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
-
-		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-
-		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-		assertTrue(future1.isDone());
-		assertFalse(future2.isDone());
-
-		// return this slot to pool
-		slot1.releaseSlot();
-
-		// second allocation fulfilled by previous slot returning
-		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-		assertTrue(future2.isDone());
-
-		assertNotEquals(slot1, slot2);
-		assertTrue(slot1.isReleased());
-		assertTrue(slot2.isAlive());
-		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
-		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-	}
-
-	@Test
-	public void testAllocateWithFreeSlot() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerResource(resourceID);
-
-		JobID jobID = new JobID();
-		AllocationID allocationID1 = new AllocationID();
-		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
-		assertFalse(future1.isDone());
-
-		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-
-		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-		assertTrue(future1.isDone());
-
-		// return this slot to pool
-		slot1.releaseSlot();
-
-		AllocationID allocationID2 = new AllocationID();
-		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
-
-		// second allocation fulfilled by previous slot returning
-		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-		assertTrue(future2.isDone());
-
-		assertNotEquals(slot1, slot2);
-		assertTrue(slot1.isReleased());
-		assertTrue(slot2.isAlive());
-		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
-		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-	}
-
-	@Test
-	public void testOfferSlot() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerResource(resourceID);
-
-		JobID jobID = new JobID();
-		AllocationID allocationID = new AllocationID();
-		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
-		assertFalse(future.isDone());
-		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
-
-		// slot from unregistered resource
-		SlotDescriptor invalid = createSlotDescriptor(new ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE);
-		assertFalse(slotPool.offerSlot(allocationID, invalid));
-
-		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-
-		// reject offering with mismatch allocation id
-		assertFalse(slotPool.offerSlot(new AllocationID(), slotDescriptor));
-
-		// accepted slot
-		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-		assertTrue(future.isDone());
-		assertTrue(slot.isAlive());
-
-		// conflict offer with using slot
-		SlotDescriptor conflict = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-		assertFalse(slotPool.offerSlot(allocationID, conflict));
-
-		// duplicated offer with using slot
-		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-		assertTrue(future.isDone());
-		assertTrue(slot.isAlive());
-
-		// duplicated offer with free slot
-		slot.releaseSlot();
-		assertTrue(slot.isReleased());
-		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-	}
-
-	@Test
-	public void testReleaseResource() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerResource(resourceID);
-
-		JobID jobID = new JobID();
-
-		AllocationID allocationID1 = new AllocationID();
-		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
-
-		AllocationID allocationID2 = new AllocationID();
-		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
-
-		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-
-		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-		assertTrue(future1.isDone());
-		assertFalse(future2.isDone());
-
-		slotPool.releaseResource(resourceID);
-		assertTrue(slot1.isReleased());
-
-		// slot released and not usable, second allocation still not fulfilled
-		Thread.sleep(10);
-		assertFalse(future2.isDone());
-	}
-
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.runtime.instance;
+//
+//import org.apache.flink.api.common.JobID;
+//import org.apache.flink.api.common.time.Time;
+//import org.apache.flink.runtime.clusterframework.types.AllocationID;
+//import org.apache.flink.runtime.clusterframework.types.ResourceID;
+//import org.apache.flink.runtime.concurrent.BiFunction;
+//import org.apache.flink.runtime.concurrent.Future;
+//import org.apache.flink.runtime.jobgraph.JobVertexID;
+//import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+//import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+//import org.apache.flink.runtime.resourcemanager.SlotRequest;
+//import org.apache.flink.util.TestLogger;
+//import org.junit.After;
+//import org.junit.Before;
+//import org.junit.Test;
+//
+//import java.util.UUID;
+//import java.util.concurrent.ExecutionException;
+//import java.util.concurrent.Executor;
+//import java.util.concurrent.ExecutorService;
+//import java.util.concurrent.Executors;
+//import java.util.concurrent.TimeUnit;
+//
+//import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+//import static org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor;
+//import static org.junit.Assert.assertEquals;
+//import static org.junit.Assert.assertFalse;
+//import static org.junit.Assert.assertNotEquals;
+//import static org.junit.Assert.assertNotNull;
+//import static org.junit.Assert.assertNull;
+//import static org.junit.Assert.assertTrue;
+//import static org.junit.Assert.fail;
+//import static org.mockito.Matchers.any;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.times;
+//import static org.mockito.Mockito.verify;
+//import static org.mockito.Mockito.when;
+//
+//public class SlotPoolTest extends TestLogger {
+//
+//	private ExecutorService executor;
+//
+//	private SlotPool slotPool;
+//
+//	private ResourceManagerGateway resourceManagerGateway;
+//
+//	@Before
+//	public void setUp() throws Exception {
+//		this.executor = Executors.newFixedThreadPool(1);
+//		this.slotPool = new SlotPool(executor);
+//		this.resourceManagerGateway = mock(ResourceManagerGateway.class);
+//		when(resourceManagerGateway
+//			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
+//			.thenReturn(mock(Future.class));
+//
+//		slotPool.setResourceManager(UUID.randomUUID(), resourceManagerGateway);
+//		slotPool.setJobManagerLeaderId(UUID.randomUUID());
+//	}
+//
+//	@After
+//	public void tearDown() throws Exception {
+//	}
+//
+//	@Test
+//	public void testAllocateSimpleSlot() throws Exception {
+//		ResourceID resourceID = new ResourceID("resource");
+//		slotPool.registerResource(resourceID);
+//
+//		JobID jobID = new JobID();
+//		AllocationID allocationID = new AllocationID();
+//		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
+//		assertFalse(future.isDone());
+//		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+//
+//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//
+//		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+//		assertTrue(future.isDone());
+//		assertTrue(slot.isAlive());
+//		assertEquals(resourceID, slot.getTaskManagerID());
+//		assertEquals(jobID, slot.getJobID());
+//		assertEquals(slotPool, slot.getOwner());
+//	}
+//
+//	@Test
+//	public void testAllocateSharedSlot() throws Exception {
+//		ResourceID resourceID = new ResourceID("resource");
+//		slotPool.registerResource(resourceID);
+//
+//		JobVertexID vid = new JobVertexID();
+//		SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
+//		SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
+//
+//		JobID jobID = new JobID();
+//		AllocationID allocationID = new AllocationID();
+//		Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, DEFAULT_TESTING_PROFILE, assignment, allocationID);
+//
+//		assertFalse(future.isDone());
+//		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+//
+//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//
+//		SharedSlot slot = future.get(1, TimeUnit.SECONDS);
+//		assertTrue(future.isDone());
+//		assertTrue(slot.isAlive());
+//		assertEquals(resourceID, slot.getTaskManagerID());
+//		assertEquals(jobID, slot.getJobID());
+//		assertEquals(slotPool, slot.getOwner());
+//
+//		SimpleSlot simpleSlot = slot.allocateSubSlot(vid);
+//		assertNotNull(simpleSlot);
+//		assertTrue(simpleSlot.isAlive());
+//	}
+//
+//	@Test
+//	public void testAllocateSlotWithoutResourceManager() throws Exception {
+//		slotPool.disconnectResourceManager();
+//		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new JobID(), DEFAULT_TESTING_PROFILE);
+//		future.handleAsync(
+//			new BiFunction<SimpleSlot, Throwable, Void>() {
+//				@Override
+//				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+//					assertNull(simpleSlot);
+//					assertNotNull(throwable);
+//					return null;
+//				}
+//			},
+//			executor);
+//		try {
+//			future.get(1, TimeUnit.SECONDS);
+//			fail("We expected a ExecutionException.");
+//		} catch (ExecutionException ex) {
+//			// we expect the exception
+//		}
+//	}
+//
+//	@Test
+//	public void testAllocationFulfilledByReturnedSlot() throws Exception {
+//		ResourceID resourceID = new ResourceID("resource");
+//		slotPool.registerResource(resourceID);
+//
+//		JobID jobID = new JobID();
+//
+//		AllocationID allocationID1 = new AllocationID();
+//		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
+//
+//		AllocationID allocationID2 = new AllocationID();
+//		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
+//
+//		assertFalse(future1.isDone());
+//		assertFalse(future2.isDone());
+//		verify(resourceManagerGateway, times(2))
+//			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+//
+//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+//
+//		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+//		assertTrue(future1.isDone());
+//		assertFalse(future2.isDone());
+//
+//		// return this slot to pool
+//		slot1.releaseSlot();
+//
+//		// second allocation fulfilled by previous slot returning
+//		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+//		assertTrue(future2.isDone());
+//
+//		assertNotEquals(slot1, slot2);
+//		assertTrue(slot1.isReleased());
+//		assertTrue(slot2.isAlive());
+//		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+//		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+//	}
+//
+//	@Test
+//	public void testAllocateWithFreeSlot() throws Exception {
+//		ResourceID resourceID = new ResourceID("resource");
+//		slotPool.registerResource(resourceID);
+//
+//		JobID jobID = new JobID();
+//		AllocationID allocationID1 = new AllocationID();
+//		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
+//		assertFalse(future1.isDone());
+//
+//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+//
+//		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+//		assertTrue(future1.isDone());
+//
+//		// return this slot to pool
+//		slot1.releaseSlot();
+//
+//		AllocationID allocationID2 = new AllocationID();
+//		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
+//
+//		// second allocation fulfilled by previous slot returning
+//		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+//		assertTrue(future2.isDone());
+//
+//		assertNotEquals(slot1, slot2);
+//		assertTrue(slot1.isReleased());
+//		assertTrue(slot2.isAlive());
+//		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+//		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+//	}
+//
+//	@Test
+//	public void testOfferSlot() throws Exception {
+//		ResourceID resourceID = new ResourceID("resource");
+//		slotPool.registerResource(resourceID);
+//
+//		JobID jobID = new JobID();
+//		AllocationID allocationID = new AllocationID();
+//		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
+//		assertFalse(future.isDone());
+//		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+//
+//		// slot from unregistered resource
+//		SlotDescriptor invalid = createSlotDescriptor(new ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE);
+//		assertFalse(slotPool.offerSlot(allocationID, invalid));
+//
+//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//
+//		// reject offering with mismatch allocation id
+//		assertFalse(slotPool.offerSlot(new AllocationID(), slotDescriptor));
+//
+//		// accepted slot
+//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+//		assertTrue(future.isDone());
+//		assertTrue(slot.isAlive());
+//
+//		// conflict offer with using slot
+//		SlotDescriptor conflict = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//		assertFalse(slotPool.offerSlot(allocationID, conflict));
+//
+//		// duplicated offer with using slot
+//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//		assertTrue(future.isDone());
+//		assertTrue(slot.isAlive());
+//
+//		// duplicated offer with free slot
+//		slot.releaseSlot();
+//		assertTrue(slot.isReleased());
+//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+//	}
+//
+//	@Test
+//	public void testReleaseResource() throws Exception {
+//		ResourceID resourceID = new ResourceID("resource");
+//		slotPool.registerResource(resourceID);
+//
+//		JobID jobID = new JobID();
+//
+//		AllocationID allocationID1 = new AllocationID();
+//		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
+//
+//		AllocationID allocationID2 = new AllocationID();
+//		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
+//
+//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+//		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+//
+//		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+//		assertTrue(future1.isDone());
+//		assertFalse(future2.isDone());
+//
+//		slotPool.releaseResource(resourceID);
+//		assertTrue(slot1.isReleased());
+//
+//		// slot released and not usable, second allocation still not fulfilled
+//		Thread.sleep(10);
+//		assertFalse(future2.isDone());
+//	}
+//
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index dd43337..f5b3892 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.minicluster;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -31,7 +33,7 @@ import org.junit.Test;
  */
 public class MiniClusterITCase extends TestLogger {
 
-//	@Test
+	@Test
 	public void runJobWithSingleRpcService() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
 
@@ -74,6 +76,13 @@ public class MiniClusterITCase extends TestLogger {
 		task.setMaxParallelism(1);
 		task.setInvokableClass(NoOpInvokable.class);
 
-		return new JobGraph(new JobID(), "Test Job", task);
+		JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
+		jg.setAllowQueuedScheduling(true);
+
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+		jg.setExecutionConfig(executionConfig);
+
+		return jg;
 	}
 }