You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/10 14:21:41 UTC
[2/9] flink git commit: [hotfix] [runtime] Add toString() and print
methods to SlotPool classes for as debugging/diagnostic helpers
[hotfix] [runtime] Add toString() and print methods to SlotPool classes for as debugging/diagnostic helpers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a91bcee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a91bcee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a91bcee
Branch: refs/heads/master
Commit: 1a91bcee251a2a14ff7856b175af88ca726ef1cc
Parents: 6ce67c1
Author: Stephan Ewen <se...@apache.org>
Authored: Sat May 5 17:58:20 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 10 16:18:31 2018 +0200
----------------------------------------------------------------------
.../runtime/jobmaster/slotpool/SlotPool.java | 50 ++++++++++++++++++++
1 file changed, 50 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a91bcee/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index c9366c3..6ea74d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -63,6 +63,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -308,6 +309,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
boolean allowQueuedScheduling,
Time allocationTimeout) {
+ log.debug("Allocating slot with request {} for task execution {}", slotRequestId, task.getTaskToExecute());
+
final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
if (slotSharingGroupId != null) {
@@ -1135,6 +1138,36 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
// Methods for tests
// ------------------------------------------------------------------------
+ private String printStatus() {
+ validateRunsInMainThread();
+
+ final StringBuilder builder = new StringBuilder(1024).append("Slot Pool Status:\n");
+
+ builder.append("\tstatus: ");
+ if (resourceManagerGateway != null) {
+ builder.append("connected to ").append(resourceManagerGateway.getAddress()).append('\n');
+ } else {
+ builder.append("unconnected and waiting for ResourceManager ")
+ .append(waitingForResourceManager)
+ .append('\n');
+ }
+
+ builder.append("\tregistered TaskManagers: ").append(registeredTaskManagers).append('\n');
+
+ builder.append("\tavailable slots: ").append(availableSlots.printAllSlots()).append('\n');
+ builder.append("\tallocated slots: ").append(allocatedSlots.printAllSlots()).append('\n');
+
+ builder.append("\tpending requests: ").append(pendingRequests.values()).append('\n');
+
+ builder.append("\tsharing groups: {\n");
+ for (Entry<SlotSharingGroupId, SlotSharingManager> manager : slotSharingManagers.entrySet()) {
+ builder.append("\t -------- ").append(manager.getKey()).append(" --------\n");
+ builder.append(manager.getValue());
+ }
+ builder.append("\t}\n");
+ return builder.toString();
+ }
+
@VisibleForTesting
protected AllocatedSlots getAllocatedSlots() {
return allocatedSlots;
@@ -1291,6 +1324,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
allocatedSlotsByTaskManager.clear();
}
+ String printAllSlots() {
+ return allocatedSlotsByTaskManager.values().toString();
+ }
+
@VisibleForTesting
boolean containResource(final ResourceID resourceID) {
return allocatedSlotsByTaskManager.containsKey(resourceID);
@@ -1480,6 +1517,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
}
}
+ String printAllSlots() {
+ return availableSlots.values().toString();
+ }
+
@VisibleForTesting
boolean containsTaskManager(ResourceID resourceID) {
return availableSlotsByTaskManager.containsKey(resourceID);
@@ -1593,6 +1634,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
public ResourceProfile getResourceProfile() {
return resourceProfile;
}
+
+ @Override
+ public String toString() {
+ return "PendingRequest{" +
+ "slotRequestId=" + slotRequestId +
+ ", resourceProfile=" + resourceProfile +
+ ", allocatedSlotFuture=" + allocatedSlotFuture +
+ '}';
+ }
}
// ------------------------------------------------------------------------