You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2022/09/28 14:47:35 UTC
[ozone] branch master updated: HDDS-7232. Introduce container-level lock while handling container report events by SCM and Recon (#3766)
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new efb3a80f73 HDDS-7232. Introduce container-level lock while handling container report events by SCM and Recon (#3766)
efb3a80f73 is described below
commit efb3a80f73fbda9ac730df7a5a245a7b094e59f2
Author: devmadhuu <de...@hotmail.com>
AuthorDate: Wed Sep 28 20:17:28 2022 +0530
HDDS-7232. Introduce container-level lock while handling container report events by SCM and Recon (#3766)
---
.../scm/container/ContainerStateManagerImpl.java | 178 ++++++++++++---------
.../scm/container/states/ContainerAttribute.java | 14 +-
2 files changed, 110 insertions(+), 82 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index e5d279243a..f2631627f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -53,6 +53,9 @@ import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.lock.LockManager;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +74,9 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CL
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK_DEFAULT;
+
/**
* Default implementation of ContainerStateManager. This implementation
* holds the Container States in-memory which is backed by a persistent store.
@@ -81,6 +87,10 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DE
public final class ContainerStateManagerImpl
implements ContainerStateManager {
+ private ConfigurationSource confSrc;
+
+ private final LockManager<ContainerID> lockManager;
+
/**
* Logger instance of ContainerStateManagerImpl.
*/
@@ -125,7 +135,7 @@ public final class ContainerStateManagerImpl
// Protect containers and containerStore against the potential
// contentions between RaftServer and ContainerManager.
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
/**
* constructs ContainerStateManagerImpl instance and loads the containers
@@ -141,6 +151,7 @@ public final class ContainerStateManagerImpl
final Table<ContainerID, ContainerInfo> containerStore,
final DBTransactionBuffer buffer)
throws IOException {
+ this.confSrc = OzoneConfiguration.of(conf);
this.pipelineManager = pipelineManager;
this.containerStore = containerStore;
this.stateMachine = newStateMachine();
@@ -149,7 +160,10 @@ public final class ContainerStateManagerImpl
this.lastUsedMap = new ConcurrentHashMap<>();
this.containerStateChangeActions = getContainerStateChangeActions();
this.transactionBuffer = buffer;
-
+ boolean fair = conf.getBoolean(OZONE_MANAGER_FAIR_LOCK,
+ OZONE_MANAGER_FAIR_LOCK_DEFAULT);
+ this.lockManager =
+ new LockManager<>(confSrc, fair);
initialize();
}
@@ -280,11 +294,11 @@ public final class ContainerStateManagerImpl
@Override
public ContainerInfo getContainer(final ContainerID id) {
- lock.readLock().lock();
+ lockManager.readLock(id);
try {
return containers.getContainerInfo(id);
} finally {
- lock.readLock().unlock();
+ lockManager.readUnlock(id);
}
}
@@ -303,24 +317,30 @@ public final class ContainerStateManagerImpl
lock.writeLock().lock();
try {
- if (!containers.contains(containerID)) {
- ExecutionUtil.create(() -> {
- transactionBuffer.addToBuffer(containerStore, containerID, container);
- containers.addContainer(container);
- if (pipelineManager.containsPipeline(pipelineID)) {
- pipelineManager.addContainerToPipeline(pipelineID, containerID);
- } else if (containerInfo.getState().
- equals(HddsProtos.LifeCycleState.OPEN)) {
- // Pipeline should exist, but not
- throw new PipelineNotFoundException();
- }
- //recon may receive report of closed container,
- // no corresponding Pipeline can be synced for scm.
- // just only add the container.
- }).onException(() -> {
- containers.removeContainer(containerID);
- transactionBuffer.removeFromBuffer(containerStore, containerID);
- }).execute();
+ lockManager.writeLock(containerID);
+ try {
+ if (!containers.contains(containerID)) {
+ ExecutionUtil.create(() -> {
+ transactionBuffer.addToBuffer(containerStore,
+ containerID, container);
+ containers.addContainer(container);
+ if (pipelineManager.containsPipeline(pipelineID)) {
+ pipelineManager.addContainerToPipeline(pipelineID, containerID);
+ } else if (containerInfo.getState().
+ equals(LifeCycleState.OPEN)) {
+ // Pipeline should exist, but not
+ throw new PipelineNotFoundException();
+ }
+ //recon may receive report of closed container,
+ // no corresponding Pipeline can be synced for scm.
+ // just only add the container.
+ }).onException(() -> {
+ containers.removeContainer(containerID);
+ transactionBuffer.removeFromBuffer(containerStore, containerID);
+ }).execute();
+ }
+ } finally {
+ lockManager.writeUnlock(containerID);
}
} finally {
lock.writeLock().unlock();
@@ -329,11 +349,11 @@ public final class ContainerStateManagerImpl
@Override
public boolean contains(ContainerID id) {
- lock.readLock().lock();
+ lockManager.readLock(id);
try {
return containers.contains(id);
} finally {
- lock.readLock().unlock();
+ lockManager.readUnlock(id);
}
}
@@ -344,7 +364,7 @@ public final class ContainerStateManagerImpl
// TODO: Remove the protobuf conversion after fixing ContainerStateMap.
final ContainerID id = ContainerID.getFromProtobuf(containerID);
- lock.writeLock().lock();
+ lockManager.writeLock(id);
try {
if (containers.contains(id)) {
final ContainerInfo oldInfo = containers.getContainerInfo(id);
@@ -365,52 +385,54 @@ public final class ContainerStateManagerImpl
}
}
} finally {
- lock.writeLock().unlock();
+ lockManager.writeUnlock(id);
}
}
@Override
public Set<ContainerReplica> getContainerReplicas(final ContainerID id) {
- lock.readLock().lock();
+ lockManager.readLock(id);
try {
return containers.getContainerReplicas(id);
} finally {
- lock.readLock().unlock();
+ lockManager.readUnlock(id);
}
}
@Override
public void updateContainerReplica(final ContainerID id,
final ContainerReplica replica) {
- lock.writeLock().lock();
+ lockManager.writeLock(id);
try {
containers.updateContainerReplica(id, replica);
} finally {
- lock.writeLock().unlock();
+ lockManager.writeUnlock(id);
}
}
@Override
public void removeContainerReplica(final ContainerID id,
final ContainerReplica replica) {
- lock.writeLock().lock();
+ lockManager.writeLock(id);
try {
containers.removeContainerReplica(id,
replica);
} finally {
- lock.writeLock().unlock();
+ lockManager.writeUnlock(id);
}
}
@Override
public void updateDeleteTransactionId(
final Map<ContainerID, Long> deleteTransactionMap) throws IOException {
- lock.writeLock().lock();
- try {
- // TODO: Refactor this. Error handling is not done.
- for (Map.Entry<ContainerID, Long> transaction :
- deleteTransactionMap.entrySet()) {
+
+ // TODO: Refactor this. Error handling is not done.
+ for (Map.Entry<ContainerID, Long> transaction :
+ deleteTransactionMap.entrySet()) {
+ ContainerID containerID = transaction.getKey();
+ try {
+ lockManager.writeLock(containerID);
final ContainerInfo info = containers.getContainerInfo(
transaction.getKey());
if (info == null) {
@@ -420,9 +442,9 @@ public final class ContainerStateManagerImpl
}
info.updateDeleteTransactionId(transaction.getValue());
transactionBuffer.addToBuffer(containerStore, info.containerID(), info);
+ } finally {
+ lockManager.writeUnlock(containerID);
}
- } finally {
- lock.writeLock().unlock();
}
}
@@ -438,48 +460,49 @@ public final class ContainerStateManagerImpl
final ContainerID lastID =
lastUsedMap.getOrDefault(key, containerIDs.first());
+
// There is a small issue here. The first time, we will skip the first
// container. But in most cases it will not matter.
NavigableSet<ContainerID> resultSet = containerIDs.tailSet(lastID, false);
if (resultSet.isEmpty()) {
resultSet = containerIDs;
}
+ ContainerInfo selectedContainer = findContainerWithSpace(size, resultSet);
+ if (selectedContainer == null) {
+
+ // If we did not find any space in the tailSet, we need to look for
+ // space in the headset, we need to pass true to deal with the
+ // situation that we have a lone container that has space. That is we
+ // ignored the last used container under the assumption we can find
+ // other containers with space, but if have a single container that is
+ // not true. Hence we need to include the last used container as the
+ // last element in the sorted set.
+
+ resultSet = containerIDs.headSet(lastID, true);
+ selectedContainer = findContainerWithSpace(size, resultSet);
+ }
- lock.readLock().lock();
- try {
- ContainerInfo selectedContainer = findContainerWithSpace(size, resultSet);
- if (selectedContainer == null) {
-
- // If we did not find any space in the tailSet, we need to look for
- // space in the headset, we need to pass true to deal with the
- // situation that we have a lone container that has space. That is we
- // ignored the last used container under the assumption we can find
- // other containers with space, but if have a single container that is
- // not true. Hence we need to include the last used container as the
- // last element in the sorted set.
-
- resultSet = containerIDs.headSet(lastID, true);
- selectedContainer = findContainerWithSpace(size, resultSet);
- }
-
- // TODO: cleanup entries in lastUsedMap
- if (selectedContainer != null) {
- lastUsedMap.put(key, selectedContainer.containerID());
- }
- return selectedContainer;
- } finally {
- lock.readLock().unlock();
+ // TODO: cleanup entries in lastUsedMap
+ if (selectedContainer != null) {
+ lastUsedMap.put(key, selectedContainer.containerID());
}
+ return selectedContainer;
}
private ContainerInfo findContainerWithSpace(final long size,
- final NavigableSet<ContainerID> searchSet) {
- // Get the container with space to meet our request.
+ final NavigableSet<ContainerID>
+ searchSet) {
+ // Get the container with space to meet our request.
for (ContainerID id : searchSet) {
- final ContainerInfo containerInfo = containers.getContainerInfo(id);
- if (containerInfo.getUsedBytes() + size <= this.containerSize) {
- containerInfo.updateLastUsedTime();
- return containerInfo;
+ try {
+ lockManager.readLock(id);
+ final ContainerInfo containerInfo = containers.getContainerInfo(id);
+ if (containerInfo.getUsedBytes() + size <= this.containerSize) {
+ containerInfo.updateLastUsedTime();
+ return containerInfo;
+ }
+ } finally {
+ lockManager.readUnlock(id);
}
}
return null;
@@ -488,14 +511,19 @@ public final class ContainerStateManagerImpl
public void removeContainer(final HddsProtos.ContainerID id)
throws IOException {
+ final ContainerID cid = ContainerID.getFromProtobuf(id);
lock.writeLock().lock();
try {
- final ContainerID cid = ContainerID.getFromProtobuf(id);
- final ContainerInfo containerInfo = containers.getContainerInfo(cid);
- ExecutionUtil.create(() -> {
- transactionBuffer.removeFromBuffer(containerStore, cid);
- containers.removeContainer(cid);
- }).onException(() -> containerStore.put(cid, containerInfo)).execute();
+ lockManager.writeLock(cid);
+ try {
+ final ContainerInfo containerInfo = containers.getContainerInfo(cid);
+ ExecutionUtil.create(() -> {
+ transactionBuffer.removeFromBuffer(containerStore, cid);
+ containers.removeContainer(cid);
+ }).onException(() -> containerStore.put(cid, containerInfo)).execute();
+ } finally {
+ lockManager.writeUnlock(cid);
+ }
} finally {
lock.writeLock().unlock();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
index 08be71f4ee..1e4eee0d66 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
@@ -24,13 +24,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
-import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
- .FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
/**
* Each Attribute that we manage for a container is maintained as a map.
@@ -65,7 +64,7 @@ public class ContainerAttribute<T> {
private final Map<T, NavigableSet<ContainerID>> attributeMap;
private static final NavigableSet<ContainerID> EMPTY_SET = Collections
- .unmodifiableNavigableSet(new TreeSet<>());
+ .unmodifiableNavigableSet(new ConcurrentSkipListSet<>());
/**
* Creates a Container Attribute map from an existing Map.
@@ -80,7 +79,7 @@ public class ContainerAttribute<T> {
* Create an empty Container Attribute map.
*/
public ContainerAttribute() {
- this.attributeMap = new HashMap<>();
+ this.attributeMap = new ConcurrentHashMap<>();
}
/**
@@ -94,7 +93,8 @@ public class ContainerAttribute<T> {
public boolean insert(T key, ContainerID value) throws SCMException {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
- attributeMap.computeIfAbsent(key, any -> new TreeSet<>()).add(value);
+ attributeMap.computeIfAbsent(key, any ->
+ new ConcurrentSkipListSet<>()).add(value);
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org