You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/15 17:57:28 UTC

[2/2] flink git commit: [FLINK-5793] [runtime] fix running slot may not be add to AllocatedMap in SlotPool bug

[FLINK-5793] [runtime] fix running slot may not be add to AllocatedMap in SlotPool bug

This closes #3306


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/33ea78ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/33ea78ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/33ea78ea

Branch: refs/heads/master
Commit: 33ea78ea37fab819a329b09aa213e61c16252067
Parents: 7477c5b
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Tue Feb 14 14:56:41 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 18:45:46 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/instance/SlotPool.java    | 13 ++++++++++++-
 .../apache/flink/runtime/instance/SlotPoolTest.java    |  2 ++
 2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/33ea78ea/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 672431e..4da6c7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -436,7 +436,9 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 					LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
 							pendingRequest.allocationID(), taskManagerSlot.getSlotAllocationId());
 
-					pendingRequest.future().complete(createSimpleSlot(taskManagerSlot, Locality.UNKNOWN));
+					SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN);
+					allocatedSlots.add(newSlot);
+					pendingRequest.future().complete(newSlot);
 				}
 				else {
 					LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
@@ -627,6 +629,15 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Methods for tests
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	AllocatedSlots getAllocatedSlots() {
+		return allocatedSlots;
+	}
+
+	// ------------------------------------------------------------------------
 	//  Helper classes
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33ea78ea/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 97457e1..538e286 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -115,6 +115,7 @@ public class SlotPoolTest extends TestLogger {
 		assertEquals(resourceID, slot.getTaskManagerID());
 		assertEquals(jobId, slot.getJobID());
 		assertEquals(slotPool.getSlotOwner(), slot.getOwner());
+		assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot);
 	}
 
 	@Test
@@ -153,6 +154,7 @@ public class SlotPoolTest extends TestLogger {
 		assertTrue(slot2.isAlive());
 		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
 		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+		assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2);
 	}
 
 	@Test