You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/23 22:06:02 UTC
[05/11] flink git commit: [hotfix] Improve logging of SlotPool and
SlotSharingManager
[hotfix] Improve logging of SlotPool and SlotSharingManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19d39ec7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19d39ec7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19d39ec7
Branch: refs/heads/master
Commit: 19d39ec7ad2ed69fe81cea72299466bd7d6965e5
Parents: 4e616a8
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 19 13:41:03 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200
----------------------------------------------------------------------
.../clusterframework/types/AllocationID.java | 5 +++
.../flink/runtime/jobmaster/SlotRequestId.java | 5 +++
.../runtime/jobmaster/slotpool/SlotPool.java | 36 ++++++++++----------
.../jobmaster/slotpool/SlotSharingManager.java | 21 ++++++++++--
4 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
index e722e9f..7004eff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
@@ -52,4 +52,9 @@ public class AllocationID extends AbstractID {
public AllocationID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}
+
+ @Override
+ public String toString() {
+ return "AllocationID{" + super.toString() + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
index 203139c..5ac200d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
@@ -40,4 +40,9 @@ public final class SlotRequestId extends AbstractID {
}
public SlotRequestId() {}
+
+ @Override
+ public String toString() {
+ return "SlotRequestId{" + super.toString() + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 829c82e..13f0462 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -323,7 +323,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
boolean allowQueuedScheduling,
Time allocationTimeout) {
- log.debug("Allocating slot with request {} for task execution {}", slotRequestId, task.getTaskToExecute());
+ log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute());
final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
@@ -686,7 +686,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
checkNotNull(resourceManagerGateway);
checkNotNull(pendingRequest);
- log.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
+ log.info("Requesting new slot [{}] and profile {} from resource manager.", pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile());
final AllocationID allocationId = new AllocationID();
@@ -723,7 +723,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
"No pooled slot available and request to ResourceManager for new slot failed", failure));
} else {
if (log.isDebugEnabled()) {
- log.debug("Unregistered slot request {} failed.", slotRequestID, failure);
+ log.debug("Unregistered slot request [{}] failed.", slotRequestID, failure);
}
}
}
@@ -731,7 +731,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
log.info("Cannot serve slot request, no ResourceManager connected. " +
- "Adding as pending request {}", pendingRequest.getSlotRequestId());
+ "Adding as pending request [{}]", pendingRequest.getSlotRequestId());
waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
}
@@ -742,7 +742,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
@Override
public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
- log.debug("Releasing slot with slot request id {} because of {}.", slotRequestId, cause != null ? cause.getMessage() : "null");
+ log.debug("Releasing slot [{}] because: {}", slotRequestId, cause != null ? cause.getMessage() : "null");
if (slotSharingGroupId != null) {
final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
@@ -753,7 +753,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
if (taskSlot != null) {
taskSlot.release(cause);
} else {
- log.debug("Could not find slot {} in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
+ log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
}
} else {
log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId);
@@ -770,7 +770,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
allocatedSlot.releasePayload(cause);
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
} else {
- log.debug("There is no allocated slot with slot request id {}. Ignoring the release slot request.", slotRequestId);
+ log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", slotRequestId);
}
}
}
@@ -801,11 +801,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
}
private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
- Preconditions.checkNotNull(pendingRequest);
- Preconditions.checkNotNull(e);
+ checkNotNull(pendingRequest);
+ checkNotNull(e);
if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
- log.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
+ log.info("Failing pending slot request [{}]: {}", pendingRequest.getSlotRequestId(), e.getMessage());
pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
}
}
@@ -833,7 +833,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
if (pendingRequest != null) {
- log.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+ log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",
pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
@@ -970,7 +970,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
allocatedSlots.remove(pendingRequest.getSlotRequestId());
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
} else {
- log.debug("Fulfilled slot request {} with allocated slot {}.", pendingRequest.getSlotRequestId(), allocationID);
+ log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
}
}
else {
@@ -1011,7 +1011,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
failPendingRequest(pendingRequest, cause);
}
else if (availableSlots.tryRemove(allocationID)) {
- log.debug("Failed available slot with allocation id {}.", allocationID, cause);
+ log.debug("Failed available slot [{}].", allocationID, cause);
}
else {
AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
@@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
allocatedSlot.releasePayload(cause);
}
else {
- log.trace("Outdated request to fail slot with allocation id {}.", allocationID, cause);
+ log.trace("Outdated request to fail slot [{}].", allocationID, cause);
}
}
// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
@@ -1068,7 +1068,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
@VisibleForTesting
protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
- log.info("Pending slot request {} timed out.", slotRequestId);
+ log.info("Pending slot request [{}] timed out.", slotRequestId);
removePendingRequest(slotRequestId);
}
@@ -1109,7 +1109,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
final AllocationID allocationID = expiredSlot.getAllocationId();
if (availableSlots.tryRemove(allocationID)) {
- log.info("Releasing idle slot {}.", allocationID);
+ log.info("Releasing idle slot [{}].", allocationID);
final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
allocationID,
cause,
@@ -1119,12 +1119,12 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
- log.debug("Releasing slot {} of registered TaskExecutor {} failed. " +
+ log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
"Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
throwable);
tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
} else {
- log.debug("Releasing slot {} failed and owning TaskExecutor {} is no " +
+ log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
"longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index eaa5787..afcd24f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -32,6 +32,9 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@@ -77,6 +80,8 @@ import java.util.function.Function;
*/
public class SlotSharingManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class);
+
/** Lock for the internal data structures. */
private final Object lock = new Object();
@@ -143,6 +148,8 @@ public class SlotSharingManager {
slotContextFuture,
allocatedSlotRequestId);
+ LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);
+
allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
synchronized (lock) {
@@ -158,6 +165,8 @@ public class SlotSharingManager {
final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
if (resolvedRootNode != null) {
+ LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+
final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
slotContext.getTaskManagerLocation(),
taskManagerLocation -> new HashSet<>(4));
@@ -384,6 +393,8 @@ public class SlotSharingManager {
MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) {
Preconditions.checkState(!super.contains(groupId));
+ LOG.debug("Create nested multi task slot [{}] in parent multi task slot [{}] for group [{}].", slotRequestId, getSlotRequestId(), groupId);
+
final MultiTaskSlot inner = new MultiTaskSlot(
slotRequestId,
groupId,
@@ -412,6 +423,8 @@ public class SlotSharingManager {
Locality locality) {
Preconditions.checkState(!super.contains(groupId));
+ LOG.debug("Create single task slot [{}] in multi task slot [{}] for group {}.", slotRequestId, getSlotRequestId(), groupId);
+
final SingleTaskSlot leaf = new SingleTaskSlot(
slotRequestId,
groupId,
@@ -557,13 +570,15 @@ public class SlotSharingManager {
Preconditions.checkNotNull(locality);
singleLogicalSlotFuture = parent.getSlotContextFuture()
.thenApply(
- (SlotContext slotContext) ->
- new SingleLogicalSlot(
+ (SlotContext slotContext) -> {
+ LOG.trace("Fulfill single task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+ return new SingleLogicalSlot(
slotRequestId,
slotContext,
slotSharingGroupId,
locality,
- slotOwner));
+ slotOwner);
+ });
}
CompletableFuture<LogicalSlot> getLogicalSlotFuture() {