You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/17 13:52:33 UTC

[flink] 01/02: [FLINK-9917][JM] Remove superfluous lock from SlotSharingManager

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d2bd39e361c53da99ae7383a51695cc896ec2bbe
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Jul 22 22:11:13 2018 +0200

    [FLINK-9917][JM] Remove superfluous lock from SlotSharingManager
    
    The SlotSharingManager is designed to be used by a single thread. Therefore,
    it is the responsibility of the caller to make sure that there is only a single
    thread at any given time accesssing this component. Consequently, the component
    does not need to be synchronized.
    
    This closes #6389.
---
 .../jobmaster/slotpool/SlotSharingManager.java     | 92 ++++++++--------------
 1 file changed, 32 insertions(+), 60 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f..ef288a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -36,7 +36,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 
 import java.util.AbstractCollection;
 import java.util.Collection;
@@ -82,9 +81,6 @@ public class SlotSharingManager {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class);
 
-	/** Lock for the internal data structures. */
-	private final Object lock = new Object();
-
 	private final SlotSharingGroupId slotSharingGroupId;
 
 	/** Actions to release allocated slots after a complete multi task slot hierarchy has been released. */
@@ -96,11 +92,9 @@ public class SlotSharingManager {
 	private final Map<SlotRequestId, TaskSlot> allTaskSlots;
 
 	/** Root nodes which have not been completed because the allocated slot is still pending. */
-	@GuardedBy("lock")
 	private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
 
 	/** Root nodes which have been completed (the underlying allocated slot has been assigned). */
-	@GuardedBy("lock")
 	private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots;
 
 	SlotSharingManager(
@@ -152,27 +146,23 @@ public class SlotSharingManager {
 
 		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
 
-		synchronized (lock) {
-			unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
-		}
+		unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
 
 		// add the root node to the set of resolved root nodes once the SlotContext future has
 		// been completed and we know the slot's TaskManagerLocation
 		slotContextFuture.whenComplete(
 			(SlotContext slotContext, Throwable throwable) -> {
 				if (slotContext != null) {
-					synchronized (lock) {
-						final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+					final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
 
-						if (resolvedRootNode != null) {
-							LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+					if (resolvedRootNode != null) {
+						LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
 
-							final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
-								slotContext.getTaskManagerLocation(),
-								taskManagerLocation -> new HashSet<>(4));
+						final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
+							slotContext.getTaskManagerLocation(),
+							taskManagerLocation -> new HashSet<>(4));
 
-							innerCollection.add(resolvedRootNode);
-						}
+						innerCollection.add(resolvedRootNode);
 					}
 				} else {
 					rootMultiTaskSlot.release(throwable);
@@ -193,15 +183,13 @@ public class SlotSharingManager {
 	 */
 	@Nullable
 	MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy matcher, SlotProfile slotProfile) {
-		synchronized (lock) {
-			Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
-			return matcher.findMatchWithLocality(
-				slotProfile,
-				resolvedRootSlotsValues.stream().flatMap(Collection::stream),
-				(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
-				(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
-				MultiTaskSlotLocality::of);
-		}
+		Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
+		return matcher.findMatchWithLocality(
+			slotProfile,
+			resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+			(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
+			(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
+			MultiTaskSlotLocality::of);
 	}
 
 	/**
@@ -213,11 +201,9 @@ public class SlotSharingManager {
 	 */
 	@Nullable
 	MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
-		synchronized (lock) {
-			for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
-				if (!multiTaskSlot.contains(groupId)) {
-					return multiTaskSlot;
-				}
+		for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
+			if (!multiTaskSlot.contains(groupId)) {
+				return multiTaskSlot;
 			}
 		}
 
@@ -228,11 +214,9 @@ public class SlotSharingManager {
 	public String toString() {
 		final StringBuilder builder = new StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n');
 
-		synchronized (lock) {
-			builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
-			builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
-			builder.append("\tall=").append(allTaskSlots).append('\n');
-		}
+		builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
+		builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
+		builder.append("\tall=").append(allTaskSlots).append('\n');
 
 		return builder.append('}').toString();
 	}
@@ -479,26 +463,20 @@ public class SlotSharingManager {
 				parent.releaseChild(getGroupId());
 			} else if (allTaskSlots.remove(getSlotRequestId()) != null) {
 				// we are the root node --> remove the root node from the list of task slots
+				final MultiTaskSlot unresolvedRootSlot = unresolvedRootSlots.remove(getSlotRequestId());
 
-				if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
-					synchronized (lock) {
-						// the root node should still be unresolved
-						unresolvedRootSlots.remove(getSlotRequestId());
-					}
-				} else {
+				if (unresolvedRootSlot == null) {
 					// the root node should be resolved --> we can access the slot context
 					final SlotContext slotContext = slotContextFuture.getNow(null);
 
 					if (slotContext != null) {
-						synchronized (lock) {
-							final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+						final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
 
-							if (multiTaskSlots != null) {
-								multiTaskSlots.remove(this);
+						if (multiTaskSlots != null) {
+							multiTaskSlots.remove(this);
 
-								if (multiTaskSlots.isEmpty()) {
-									resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
-								}
+							if (multiTaskSlots.isEmpty()) {
+								resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
 							}
 						}
 					}
@@ -637,9 +615,7 @@ public class SlotSharingManager {
 
 	@VisibleForTesting
 	Collection<MultiTaskSlot> getUnresolvedRootSlots() {
-		synchronized (lock) {
-			return unresolvedRootSlots.values();
-		}
+		return unresolvedRootSlots.values();
 	}
 
 	/**
@@ -649,19 +625,15 @@ public class SlotSharingManager {
 
 		@Override
 		public Iterator<MultiTaskSlot> iterator() {
-			synchronized (lock) {
-				return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
-			}
+			return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
 		}
 
 		@Override
 		public int size() {
 			int numberResolvedMultiTaskSlots = 0;
 
-			synchronized (lock) {
-				for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
-					numberResolvedMultiTaskSlots += multiTaskSlots.size();
-				}
+			for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
+				numberResolvedMultiTaskSlots += multiTaskSlots.size();
 			}
 
 			return numberResolvedMultiTaskSlots;