You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/22 15:45:41 UTC

[8/9] flink git commit: [FLINK-9493] Forward cause when releasing a TaskManager at the SlotPool

[FLINK-9493] Forward cause when releasing a TaskManager at the SlotPool

This closes #6202.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53728132
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53728132
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53728132

Branch: refs/heads/master
Commit: 537281323adcbcf71cddfd824c56677e66c718ff
Parents: 029b9c0
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Thu Jun 21 19:43:42 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/jobmaster/JobMaster.java    |  2 +-
 .../flink/runtime/jobmaster/slotpool/SlotPool.java   | 15 ++++++++-------
 .../runtime/jobmaster/slotpool/SlotPoolGateway.java  |  3 ++-
 .../jobmanager/scheduler/SchedulerTestBase.java      |  2 +-
 .../runtime/jobmaster/slotpool/SlotPoolTest.java     |  4 ++--
 5 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f96e0ae..aaa2f0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -648,7 +648,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		log.debug("Disconnect TaskExecutor {} because: {}", resourceID, cause.getMessage());
 
 		taskManagerHeartbeatManager.unmonitorTarget(resourceID);
-		CompletableFuture<Acknowledge> releaseFuture = slotPoolGateway.releaseTaskManager(resourceID);
+		CompletableFuture<Acknowledge> releaseFuture = slotPoolGateway.releaseTaskManager(resourceID, cause);
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index b090cf8..6ab21c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -87,7 +87,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * <p>All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
  * eliminate ambiguities.
  *
- * TODO : Make pending requests location preference aware
+ * <p>TODO : Make pending requests location preference aware
  * TODO : Make pass location preferences to ResourceManager when sending a slot request
  */
 public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
@@ -219,7 +219,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 		// release all registered slots by releasing the corresponding TaskExecutors
 		for (ResourceID taskManagerResourceId : registeredTaskManagers) {
-			releaseTaskManagerInternal(taskManagerResourceId);
+			final FlinkException cause = new FlinkException(
+				"Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");
+			releaseTaskManagerInternal(taskManagerResourceId, cause);
 		}
 
 		clear();
@@ -1043,11 +1045,12 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	 * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
 	 *
 	 * @param resourceId The id of the TaskManager
+	 * @param cause for the releasing of the TaskManager
 	 */
 	@Override
-	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId) {
+	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId, final Exception cause) {
 		if (registeredTaskManagers.remove(resourceId)) {
-			releaseTaskManagerInternal(resourceId);
+			releaseTaskManagerInternal(resourceId, cause);
 		}
 
 		return CompletableFuture.completedFuture(Acknowledge.get());
@@ -1063,9 +1066,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		removePendingRequest(slotRequestId);
 	}
 
-	private void releaseTaskManagerInternal(final ResourceID resourceId) {
-		final FlinkException cause = new FlinkException("Releasing TaskManager " + resourceId + '.');
-
+	private void releaseTaskManagerInternal(final ResourceID resourceId, final Exception cause) {
 		final Set<AllocatedSlot> removedSlots = new HashSet<>(allocatedSlots.removeSlotsForTaskManager(resourceId));
 
 		for (AllocatedSlot allocatedSlot : removedSlots) {

http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 1aad92a..34d9c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -86,9 +86,10 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
 	 * Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}.
 	 *
 	 * @param resourceId identifying the TaskExecutor which shall be released from the SlotPool
+	 * @param cause for the releasing of the TaskManager
 	 * @return Future acknowledge which is completed after the TaskExecutor has been released
 	 */
-	CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId);
+	CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId, final Exception cause);
 
 	/**
 	 * Offers a slot to the {@link SlotPool}. The slot offer can be accepted or

http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 3d54412..940934f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -296,7 +296,7 @@ public class SchedulerTestBase extends TestLogger {
 		@Override
 		public void releaseTaskManager(ResourceID resourceId) {
 			try {
-				slotPool.releaseTaskManager(resourceId).get();
+				slotPool.releaseTaskManager(resourceId, null).get();
 			} catch (Exception e) {
 				throw new RuntimeException("Should not have happened.", e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 6f88e40..91ccc81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -378,7 +378,7 @@ public class SlotPoolTest extends TestLogger {
 
 			slot1.tryAssignPayload(dummyPayload);
 
-			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
+			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), null);
 
 			releaseFuture.get();
 			assertFalse(slot1.isAlive());
@@ -718,7 +718,7 @@ public class SlotPoolTest extends TestLogger {
 				timeout);
 
 			// release the TaskExecutor before we get a response from the slot releasing
-			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID()).get();
+			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), null).get();
 
 			// let the slot releasing fail --> since the owning TaskExecutor is no longer registered
 			// the slot should be discarded