You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/17 13:52:33 UTC
[flink] 01/02: [FLINK-9917][JM] Remove superfluous lock from
SlotSharingManager
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d2bd39e361c53da99ae7383a51695cc896ec2bbe
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Jul 22 22:11:13 2018 +0200
[FLINK-9917][JM] Remove superfluous lock from SlotSharingManager
The SlotSharingManager is designed to be used by a single thread. Therefore,
it is the responsibility of the caller to make sure that there is only a single
thread at any given time accesssing this component. Consequently, the component
does not need to be synchronized.
This closes #6389.
---
.../jobmaster/slotpool/SlotSharingManager.java | 92 ++++++++--------------
1 file changed, 32 insertions(+), 60 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f..ef288a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -36,7 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
import java.util.AbstractCollection;
import java.util.Collection;
@@ -82,9 +81,6 @@ public class SlotSharingManager {
private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class);
- /** Lock for the internal data structures. */
- private final Object lock = new Object();
-
private final SlotSharingGroupId slotSharingGroupId;
/** Actions to release allocated slots after a complete multi task slot hierarchy has been released. */
@@ -96,11 +92,9 @@ public class SlotSharingManager {
private final Map<SlotRequestId, TaskSlot> allTaskSlots;
/** Root nodes which have not been completed because the allocated slot is still pending. */
- @GuardedBy("lock")
private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
/** Root nodes which have been completed (the underlying allocated slot has been assigned). */
- @GuardedBy("lock")
private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots;
SlotSharingManager(
@@ -152,27 +146,23 @@ public class SlotSharingManager {
allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
- synchronized (lock) {
- unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
- }
+ unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
// add the root node to the set of resolved root nodes once the SlotContext future has
// been completed and we know the slot's TaskManagerLocation
slotContextFuture.whenComplete(
(SlotContext slotContext, Throwable throwable) -> {
if (slotContext != null) {
- synchronized (lock) {
- final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+ final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
- if (resolvedRootNode != null) {
- LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+ if (resolvedRootNode != null) {
+ LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
- final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
- slotContext.getTaskManagerLocation(),
- taskManagerLocation -> new HashSet<>(4));
+ final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
+ slotContext.getTaskManagerLocation(),
+ taskManagerLocation -> new HashSet<>(4));
- innerCollection.add(resolvedRootNode);
- }
+ innerCollection.add(resolvedRootNode);
}
} else {
rootMultiTaskSlot.release(throwable);
@@ -193,15 +183,13 @@ public class SlotSharingManager {
*/
@Nullable
MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy matcher, SlotProfile slotProfile) {
- synchronized (lock) {
- Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
- return matcher.findMatchWithLocality(
- slotProfile,
- resolvedRootSlotsValues.stream().flatMap(Collection::stream),
- (MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
- (MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
- MultiTaskSlotLocality::of);
- }
+ Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
+ return matcher.findMatchWithLocality(
+ slotProfile,
+ resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+ (MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
+ (MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
+ MultiTaskSlotLocality::of);
}
/**
@@ -213,11 +201,9 @@ public class SlotSharingManager {
*/
@Nullable
MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
- synchronized (lock) {
- for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
- if (!multiTaskSlot.contains(groupId)) {
- return multiTaskSlot;
- }
+ for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
+ if (!multiTaskSlot.contains(groupId)) {
+ return multiTaskSlot;
}
}
@@ -228,11 +214,9 @@ public class SlotSharingManager {
public String toString() {
final StringBuilder builder = new StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n');
- synchronized (lock) {
- builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
- builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
- builder.append("\tall=").append(allTaskSlots).append('\n');
- }
+ builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
+ builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
+ builder.append("\tall=").append(allTaskSlots).append('\n');
return builder.append('}').toString();
}
@@ -479,26 +463,20 @@ public class SlotSharingManager {
parent.releaseChild(getGroupId());
} else if (allTaskSlots.remove(getSlotRequestId()) != null) {
// we are the root node --> remove the root node from the list of task slots
+ final MultiTaskSlot unresolvedRootSlot = unresolvedRootSlots.remove(getSlotRequestId());
- if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
- synchronized (lock) {
- // the root node should still be unresolved
- unresolvedRootSlots.remove(getSlotRequestId());
- }
- } else {
+ if (unresolvedRootSlot == null) {
// the root node should be resolved --> we can access the slot context
final SlotContext slotContext = slotContextFuture.getNow(null);
if (slotContext != null) {
- synchronized (lock) {
- final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+ final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
- if (multiTaskSlots != null) {
- multiTaskSlots.remove(this);
+ if (multiTaskSlots != null) {
+ multiTaskSlots.remove(this);
- if (multiTaskSlots.isEmpty()) {
- resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
- }
+ if (multiTaskSlots.isEmpty()) {
+ resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
}
}
}
@@ -637,9 +615,7 @@ public class SlotSharingManager {
@VisibleForTesting
Collection<MultiTaskSlot> getUnresolvedRootSlots() {
- synchronized (lock) {
- return unresolvedRootSlots.values();
- }
+ return unresolvedRootSlots.values();
}
/**
@@ -649,19 +625,15 @@ public class SlotSharingManager {
@Override
public Iterator<MultiTaskSlot> iterator() {
- synchronized (lock) {
- return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
- }
+ return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
}
@Override
public int size() {
int numberResolvedMultiTaskSlots = 0;
- synchronized (lock) {
- for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
- numberResolvedMultiTaskSlots += multiTaskSlots.size();
- }
+ for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
+ numberResolvedMultiTaskSlots += multiTaskSlots.size();
}
return numberResolvedMultiTaskSlots;