You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/22 15:45:41 UTC
[8/9] flink git commit: [FLINK-9493] Forward cause when releasing a
TaskManager at the SlotPool
[FLINK-9493] Forward cause when releasing a TaskManager at the SlotPool
This closes #6202.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53728132
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53728132
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53728132
Branch: refs/heads/master
Commit: 537281323adcbcf71cddfd824c56677e66c718ff
Parents: 029b9c0
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Thu Jun 21 19:43:42 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/jobmaster/JobMaster.java | 2 +-
.../flink/runtime/jobmaster/slotpool/SlotPool.java | 15 ++++++++-------
.../runtime/jobmaster/slotpool/SlotPoolGateway.java | 3 ++-
.../jobmanager/scheduler/SchedulerTestBase.java | 2 +-
.../runtime/jobmaster/slotpool/SlotPoolTest.java | 4 ++--
5 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f96e0ae..aaa2f0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -648,7 +648,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
log.debug("Disconnect TaskExecutor {} because: {}", resourceID, cause.getMessage());
taskManagerHeartbeatManager.unmonitorTarget(resourceID);
- CompletableFuture<Acknowledge> releaseFuture = slotPoolGateway.releaseTaskManager(resourceID);
+ CompletableFuture<Acknowledge> releaseFuture = slotPoolGateway.releaseTaskManager(resourceID, cause);
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index b090cf8..6ab21c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -87,7 +87,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <p>All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
* eliminate ambiguities.
*
- * TODO : Make pending requests location preference aware
+ * <p>TODO : Make pending requests location preference aware
* TODO : Make pass location preferences to ResourceManager when sending a slot request
*/
public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
@@ -219,7 +219,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
// release all registered slots by releasing the corresponding TaskExecutors
for (ResourceID taskManagerResourceId : registeredTaskManagers) {
- releaseTaskManagerInternal(taskManagerResourceId);
+ final FlinkException cause = new FlinkException(
+ "Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");
+ releaseTaskManagerInternal(taskManagerResourceId, cause);
}
clear();
@@ -1043,11 +1045,12 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
* when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
*
* @param resourceId The id of the TaskManager
+ * @param cause for the releasing of the TaskManager
*/
@Override
- public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId) {
+ public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId, final Exception cause) {
if (registeredTaskManagers.remove(resourceId)) {
- releaseTaskManagerInternal(resourceId);
+ releaseTaskManagerInternal(resourceId, cause);
}
return CompletableFuture.completedFuture(Acknowledge.get());
@@ -1063,9 +1066,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
removePendingRequest(slotRequestId);
}
- private void releaseTaskManagerInternal(final ResourceID resourceId) {
- final FlinkException cause = new FlinkException("Releasing TaskManager " + resourceId + '.');
-
+ private void releaseTaskManagerInternal(final ResourceID resourceId, final Exception cause) {
final Set<AllocatedSlot> removedSlots = new HashSet<>(allocatedSlots.removeSlotsForTaskManager(resourceId));
for (AllocatedSlot allocatedSlot : removedSlots) {
http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 1aad92a..34d9c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -86,9 +86,10 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
* Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}.
*
* @param resourceId identifying the TaskExecutor which shall be released from the SlotPool
+ * @param cause for the releasing of the TaskManager
* @return Future acknowledge which is completed after the TaskExecutor has been released
*/
- CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId);
+ CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId, final Exception cause);
/**
* Offers a slot to the {@link SlotPool}. The slot offer can be accepted or
http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 3d54412..940934f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -296,7 +296,7 @@ public class SchedulerTestBase extends TestLogger {
@Override
public void releaseTaskManager(ResourceID resourceId) {
try {
- slotPool.releaseTaskManager(resourceId).get();
+ slotPool.releaseTaskManager(resourceId, null).get();
} catch (Exception e) {
throw new RuntimeException("Should not have happened.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 6f88e40..91ccc81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -378,7 +378,7 @@ public class SlotPoolTest extends TestLogger {
slot1.tryAssignPayload(dummyPayload);
- slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
+ slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), null);
releaseFuture.get();
assertFalse(slot1.isAlive());
@@ -718,7 +718,7 @@ public class SlotPoolTest extends TestLogger {
timeout);
// release the TaskExecutor before we get a response from the slot releasing
- slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID()).get();
+ slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), null).get();
// let the slot releasing fail --> since the owning TaskExecutor is no longer registered
// the slot should be discarded