You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/10 14:21:41 UTC

[2/9] flink git commit: [hotfix] [runtime] Add toString() and print methods to SlotPool classes for as debugging/diagnostic helpers

[hotfix] [runtime] Add toString() and print methods to SlotPool classes for as debugging/diagnostic helpers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a91bcee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a91bcee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a91bcee

Branch: refs/heads/master
Commit: 1a91bcee251a2a14ff7856b175af88ca726ef1cc
Parents: 6ce67c1
Author: Stephan Ewen <se...@apache.org>
Authored: Sat May 5 17:58:20 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 10 16:18:31 2018 +0200

----------------------------------------------------------------------
 .../runtime/jobmaster/slotpool/SlotPool.java    | 50 ++++++++++++++++++++
 1 file changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a91bcee/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index c9366c3..6ea74d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -63,6 +63,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -308,6 +309,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			boolean allowQueuedScheduling,
 			Time allocationTimeout) {
 
+		log.debug("Allocating slot with request {} for task execution {}", slotRequestId, task.getTaskToExecute());
+
 		final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
 
 		if (slotSharingGroupId != null) {
@@ -1135,6 +1138,36 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	//  Methods for tests
 	// ------------------------------------------------------------------------
 
+	private String printStatus() {
+		validateRunsInMainThread();
+
+		final StringBuilder builder = new StringBuilder(1024).append("Slot Pool Status:\n");
+
+		builder.append("\tstatus: ");
+		if (resourceManagerGateway != null) {
+			builder.append("connected to ").append(resourceManagerGateway.getAddress()).append('\n');
+		} else {
+			builder.append("unconnected and waiting for ResourceManager ")
+					.append(waitingForResourceManager)
+					.append('\n');
+		}
+
+		builder.append("\tregistered TaskManagers: ").append(registeredTaskManagers).append('\n');
+
+		builder.append("\tavailable slots: ").append(availableSlots.printAllSlots()).append('\n');
+		builder.append("\tallocated slots: ").append(allocatedSlots.printAllSlots()).append('\n');
+
+		builder.append("\tpending requests: ").append(pendingRequests.values()).append('\n');
+
+		builder.append("\tsharing groups: {\n");
+		for (Entry<SlotSharingGroupId, SlotSharingManager> manager : slotSharingManagers.entrySet()) {
+			builder.append("\t -------- ").append(manager.getKey()).append(" --------\n");
+			builder.append(manager.getValue());
+		}
+		builder.append("\t}\n");
+		return builder.toString();
+	}
+
 	@VisibleForTesting
 	protected AllocatedSlots getAllocatedSlots() {
 		return allocatedSlots;
@@ -1291,6 +1324,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			allocatedSlotsByTaskManager.clear();
 		}
 
+		String printAllSlots() {
+			return allocatedSlotsByTaskManager.values().toString();
+		}
+
 		@VisibleForTesting
 		boolean containResource(final ResourceID resourceID) {
 			return allocatedSlotsByTaskManager.containsKey(resourceID);
@@ -1480,6 +1517,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			}
 		}
 
+		String printAllSlots() {
+			return availableSlots.values().toString();
+		}
+
 		@VisibleForTesting
 		boolean containsTaskManager(ResourceID resourceID) {
 			return availableSlotsByTaskManager.containsKey(resourceID);
@@ -1593,6 +1634,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		public ResourceProfile getResourceProfile() {
 			return resourceProfile;
 		}
+
+		@Override
+		public String toString() {
+			return "PendingRequest{" +
+					"slotRequestId=" + slotRequestId +
+					", resourceProfile=" + resourceProfile +
+					", allocatedSlotFuture=" + allocatedSlotFuture +
+					'}';
+		}
 	}
 
 	// ------------------------------------------------------------------------