You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/23 22:06:02 UTC

[05/11] flink git commit: [hotfix] Improve logging of SlotPool and SlotSharingManager

[hotfix] Improve logging of SlotPool and SlotSharingManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19d39ec7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19d39ec7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19d39ec7

Branch: refs/heads/master
Commit: 19d39ec7ad2ed69fe81cea72299466bd7d6965e5
Parents: 4e616a8
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 19 13:41:03 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/types/AllocationID.java    |  5 +++
 .../flink/runtime/jobmaster/SlotRequestId.java  |  5 +++
 .../runtime/jobmaster/slotpool/SlotPool.java    | 36 ++++++++++----------
 .../jobmaster/slotpool/SlotSharingManager.java  | 21 ++++++++++--
 4 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
index e722e9f..7004eff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
@@ -52,4 +52,9 @@ public class AllocationID extends AbstractID {
 	public AllocationID(long lowerPart, long upperPart) {
 		super(lowerPart, upperPart);
 	}
+
+	@Override
+	public String toString() {
+		return "AllocationID{" + super.toString() + '}';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
index 203139c..5ac200d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
@@ -40,4 +40,9 @@ public final class SlotRequestId extends AbstractID {
 	}
 
 	public SlotRequestId() {}
+
+	@Override
+	public String toString() {
+		return "SlotRequestId{" + super.toString() + '}';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 829c82e..13f0462 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -323,7 +323,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			boolean allowQueuedScheduling,
 			Time allocationTimeout) {
 
-		log.debug("Allocating slot with request {} for task execution {}", slotRequestId, task.getTaskToExecute());
+		log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute());
 
 		final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
 
@@ -686,7 +686,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		checkNotNull(resourceManagerGateway);
 		checkNotNull(pendingRequest);
 
-		log.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
+		log.info("Requesting new slot [{}] and profile {} from resource manager.", pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile());
 
 		final AllocationID allocationId = new AllocationID();
 
@@ -723,7 +723,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 					"No pooled slot available and request to ResourceManager for new slot failed", failure));
 		} else {
 			if (log.isDebugEnabled()) {
-				log.debug("Unregistered slot request {} failed.", slotRequestID, failure);
+				log.debug("Unregistered slot request [{}] failed.", slotRequestID, failure);
 			}
 		}
 	}
@@ -731,7 +731,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
 
 		log.info("Cannot serve slot request, no ResourceManager connected. " +
-				"Adding as pending request {}",  pendingRequest.getSlotRequestId());
+				"Adding as pending request [{}]",  pendingRequest.getSlotRequestId());
 
 		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
 	}
@@ -742,7 +742,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 	@Override
 	public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
-		log.debug("Releasing slot with slot request id {} because of {}.", slotRequestId, cause != null ? cause.getMessage() : "null");
+		log.debug("Releasing slot [{}] because: {}", slotRequestId, cause != null ? cause.getMessage() : "null");
 
 		if (slotSharingGroupId != null) {
 			final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
@@ -753,7 +753,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				if (taskSlot != null) {
 					taskSlot.release(cause);
 				} else {
-					log.debug("Could not find slot {} in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
+					log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
 				}
 			} else {
 				log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId);
@@ -770,7 +770,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 					allocatedSlot.releasePayload(cause);
 					tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
 				} else {
-					log.debug("There is no allocated slot with slot request id {}. Ignoring the release slot request.", slotRequestId);
+					log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", slotRequestId);
 				}
 			}
 		}
@@ -801,11 +801,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	}
 
 	private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
-		Preconditions.checkNotNull(pendingRequest);
-		Preconditions.checkNotNull(e);
+		checkNotNull(pendingRequest);
+		checkNotNull(e);
 
 		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
-			log.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
+			log.info("Failing pending slot request [{}]: {}", pendingRequest.getSlotRequestId(), e.getMessage());
 			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
 		}
 	}
@@ -833,7 +833,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
 
 		if (pendingRequest != null) {
-			log.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+			log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",
 				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
 			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
@@ -970,7 +970,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				allocatedSlots.remove(pendingRequest.getSlotRequestId());
 				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
 			} else {
-				log.debug("Fulfilled slot request {} with allocated slot {}.", pendingRequest.getSlotRequestId(), allocationID);
+				log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
 			}
 		}
 		else {
@@ -1011,7 +1011,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			failPendingRequest(pendingRequest, cause);
 		}
 		else if (availableSlots.tryRemove(allocationID)) {
-			log.debug("Failed available slot with allocation id {}.", allocationID, cause);
+			log.debug("Failed available slot [{}].", allocationID, cause);
 		}
 		else {
 			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
@@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				allocatedSlot.releasePayload(cause);
 			}
 			else {
-				log.trace("Outdated request to fail slot with allocation id {}.", allocationID, cause);
+				log.trace("Outdated request to fail slot [{}].", allocationID, cause);
 			}
 		}
 		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
@@ -1068,7 +1068,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 	@VisibleForTesting
 	protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
-		log.info("Pending slot request {} timed out.", slotRequestId);
+		log.info("Pending slot request [{}] timed out.", slotRequestId);
 		removePendingRequest(slotRequestId);
 	}
 
@@ -1109,7 +1109,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			final AllocationID allocationID = expiredSlot.getAllocationId();
 			if (availableSlots.tryRemove(allocationID)) {
 
-				log.info("Releasing idle slot {}.", allocationID);
+				log.info("Releasing idle slot [{}].", allocationID);
 				final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
 					allocationID,
 					cause,
@@ -1119,12 +1119,12 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 					(Acknowledge ignored, Throwable throwable) -> {
 						if (throwable != null) {
 							if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
-								log.debug("Releasing slot {} of registered TaskExecutor {} failed. " +
+								log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
 									"Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
 									throwable);
 								tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
 							} else {
-								log.debug("Releasing slot {} failed and owning TaskExecutor {} is no " +
+								log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
 									"longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
 							}
 						}

http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index eaa5787..afcd24f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -32,6 +32,9 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -77,6 +80,8 @@ import java.util.function.Function;
  */
 public class SlotSharingManager {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class);
+
 	/** Lock for the internal data structures. */
 	private final Object lock = new Object();
 
@@ -143,6 +148,8 @@ public class SlotSharingManager {
 			slotContextFuture,
 			allocatedSlotRequestId);
 
+		LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);
+
 		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
 
 		synchronized (lock) {
@@ -158,6 +165,8 @@ public class SlotSharingManager {
 						final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
 
 						if (resolvedRootNode != null) {
+							LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+
 							final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
 								slotContext.getTaskManagerLocation(),
 								taskManagerLocation -> new HashSet<>(4));
@@ -384,6 +393,8 @@ public class SlotSharingManager {
 		MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) {
 			Preconditions.checkState(!super.contains(groupId));
 
+			LOG.debug("Create nested multi task slot [{}] in parent multi task slot [{}] for group [{}].", slotRequestId, getSlotRequestId(), groupId);
+
 			final MultiTaskSlot inner = new MultiTaskSlot(
 				slotRequestId,
 				groupId,
@@ -412,6 +423,8 @@ public class SlotSharingManager {
 				Locality locality) {
 			Preconditions.checkState(!super.contains(groupId));
 
+			LOG.debug("Create single task slot [{}] in multi task slot [{}] for group {}.", slotRequestId, getSlotRequestId(), groupId);
+
 			final SingleTaskSlot leaf = new SingleTaskSlot(
 				slotRequestId,
 				groupId,
@@ -557,13 +570,15 @@ public class SlotSharingManager {
 			Preconditions.checkNotNull(locality);
 			singleLogicalSlotFuture = parent.getSlotContextFuture()
 				.thenApply(
-					(SlotContext slotContext) ->
-						new SingleLogicalSlot(
+					(SlotContext slotContext) -> {
+						LOG.trace("Fulfill single task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+						return new SingleLogicalSlot(
 							slotRequestId,
 							slotContext,
 							slotSharingGroupId,
 							locality,
-							slotOwner));
+							slotOwner);
+					});
 		}
 
 		CompletableFuture<LogicalSlot> getLogicalSlotFuture() {