You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/17 13:54:18 UTC

[GitHub] asfgit closed pull request #6389: [FLINK-9917][JM] Remove superfluous lock from SlotSharingManager

asfgit closed pull request #6389: [FLINK-9917][JM] Remove superfluous lock from SlotSharingManager
URL: https://github.com/apache/flink/pull/6389
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f1064..ef288a26469 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -36,7 +36,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 
 import java.util.AbstractCollection;
 import java.util.Collection;
@@ -82,9 +81,6 @@
 
 	private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class);
 
-	/** Lock for the internal data structures. */
-	private final Object lock = new Object();
-
 	private final SlotSharingGroupId slotSharingGroupId;
 
 	/** Actions to release allocated slots after a complete multi task slot hierarchy has been released. */
@@ -96,11 +92,9 @@
 	private final Map<SlotRequestId, TaskSlot> allTaskSlots;
 
 	/** Root nodes which have not been completed because the allocated slot is still pending. */
-	@GuardedBy("lock")
 	private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
 
 	/** Root nodes which have been completed (the underlying allocated slot has been assigned). */
-	@GuardedBy("lock")
 	private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots;
 
 	SlotSharingManager(
@@ -152,27 +146,23 @@ MultiTaskSlot createRootSlot(
 
 		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
 
-		synchronized (lock) {
-			unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
-		}
+		unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
 
 		// add the root node to the set of resolved root nodes once the SlotContext future has
 		// been completed and we know the slot's TaskManagerLocation
 		slotContextFuture.whenComplete(
 			(SlotContext slotContext, Throwable throwable) -> {
 				if (slotContext != null) {
-					synchronized (lock) {
-						final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+					final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
 
-						if (resolvedRootNode != null) {
-							LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+					if (resolvedRootNode != null) {
+						LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
 
-							final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
-								slotContext.getTaskManagerLocation(),
-								taskManagerLocation -> new HashSet<>(4));
+						final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
+							slotContext.getTaskManagerLocation(),
+							taskManagerLocation -> new HashSet<>(4));
 
-							innerCollection.add(resolvedRootNode);
-						}
+						innerCollection.add(resolvedRootNode);
 					}
 				} else {
 					rootMultiTaskSlot.release(throwable);
@@ -193,15 +183,13 @@ MultiTaskSlot createRootSlot(
 	 */
 	@Nullable
 	MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy matcher, SlotProfile slotProfile) {
-		synchronized (lock) {
-			Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
-			return matcher.findMatchWithLocality(
-				slotProfile,
-				resolvedRootSlotsValues.stream().flatMap(Collection::stream),
-				(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
-				(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
-				MultiTaskSlotLocality::of);
-		}
+		Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
+		return matcher.findMatchWithLocality(
+			slotProfile,
+			resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+			(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
+			(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
+			MultiTaskSlotLocality::of);
 	}
 
 	/**
@@ -213,11 +201,9 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy
 	 */
 	@Nullable
 	MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
-		synchronized (lock) {
-			for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
-				if (!multiTaskSlot.contains(groupId)) {
-					return multiTaskSlot;
-				}
+		for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
+			if (!multiTaskSlot.contains(groupId)) {
+				return multiTaskSlot;
 			}
 		}
 
@@ -228,11 +214,9 @@ MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
 	public String toString() {
 		final StringBuilder builder = new StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n');
 
-		synchronized (lock) {
-			builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
-			builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
-			builder.append("\tall=").append(allTaskSlots).append('\n');
-		}
+		builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
+		builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
+		builder.append("\tall=").append(allTaskSlots).append('\n');
 
 		return builder.append('}').toString();
 	}
@@ -479,26 +463,20 @@ public void release(Throwable cause) {
 				parent.releaseChild(getGroupId());
 			} else if (allTaskSlots.remove(getSlotRequestId()) != null) {
 				// we are the root node --> remove the root node from the list of task slots
+				final MultiTaskSlot unresolvedRootSlot = unresolvedRootSlots.remove(getSlotRequestId());
 
-				if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
-					synchronized (lock) {
-						// the root node should still be unresolved
-						unresolvedRootSlots.remove(getSlotRequestId());
-					}
-				} else {
+				if (unresolvedRootSlot == null) {
 					// the root node should be resolved --> we can access the slot context
 					final SlotContext slotContext = slotContextFuture.getNow(null);
 
 					if (slotContext != null) {
-						synchronized (lock) {
-							final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+						final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
 
-							if (multiTaskSlots != null) {
-								multiTaskSlots.remove(this);
+						if (multiTaskSlots != null) {
+							multiTaskSlots.remove(this);
 
-								if (multiTaskSlots.isEmpty()) {
-									resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
-								}
+							if (multiTaskSlots.isEmpty()) {
+								resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
 							}
 						}
 					}
@@ -637,9 +615,7 @@ public String toString() {
 
 	@VisibleForTesting
 	Collection<MultiTaskSlot> getUnresolvedRootSlots() {
-		synchronized (lock) {
-			return unresolvedRootSlots.values();
-		}
+		return unresolvedRootSlots.values();
 	}
 
 	/**
@@ -649,19 +625,15 @@ public String toString() {
 
 		@Override
 		public Iterator<MultiTaskSlot> iterator() {
-			synchronized (lock) {
-				return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
-			}
+			return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
 		}
 
 		@Override
 		public int size() {
 			int numberResolvedMultiTaskSlots = 0;
 
-			synchronized (lock) {
-				for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
-					numberResolvedMultiTaskSlots += multiTaskSlots.size();
-				}
+			for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
+				numberResolvedMultiTaskSlots += multiTaskSlots.size();
 			}
 
 			return numberResolvedMultiTaskSlots;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services