You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2022/09/28 14:47:35 UTC

[ozone] branch master updated: HDDS-7232. Introduce container-level lock while handling container report events by SCM and Recon (#3766)

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new efb3a80f73 HDDS-7232. Introduce container-level lock while handling container report events by SCM and Recon (#3766)
efb3a80f73 is described below

commit efb3a80f73fbda9ac730df7a5a245a7b094e59f2
Author: devmadhuu <de...@hotmail.com>
AuthorDate: Wed Sep 28 20:17:28 2022 +0530

    HDDS-7232. Introduce container-level lock while handling container report events by SCM and Recon (#3766)
---
 .../scm/container/ContainerStateManagerImpl.java   | 178 ++++++++++++---------
 .../scm/container/states/ContainerAttribute.java   |  14 +-
 2 files changed, 110 insertions(+), 82 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index e5d279243a..f2631627f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -53,6 +53,9 @@ import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.lock.LockManager;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,6 +74,9 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CL
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK_DEFAULT;
+
 /**
  * Default implementation of ContainerStateManager. This implementation
  * holds the Container States in-memory which is backed by a persistent store.
@@ -81,6 +87,10 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DE
 public final class ContainerStateManagerImpl
     implements ContainerStateManager {
 
+  private ConfigurationSource confSrc;
+
+  private final LockManager<ContainerID> lockManager;
+
   /**
    * Logger instance of ContainerStateManagerImpl.
    */
@@ -125,7 +135,7 @@ public final class ContainerStateManagerImpl
 
   // Protect containers and containerStore against the potential
   // contentions between RaftServer and ContainerManager.
-  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
   /**
    * constructs ContainerStateManagerImpl instance and loads the containers
@@ -141,6 +151,7 @@ public final class ContainerStateManagerImpl
       final Table<ContainerID, ContainerInfo> containerStore,
       final DBTransactionBuffer buffer)
       throws IOException {
+    this.confSrc = OzoneConfiguration.of(conf);
     this.pipelineManager = pipelineManager;
     this.containerStore = containerStore;
     this.stateMachine = newStateMachine();
@@ -149,7 +160,10 @@ public final class ContainerStateManagerImpl
     this.lastUsedMap = new ConcurrentHashMap<>();
     this.containerStateChangeActions = getContainerStateChangeActions();
     this.transactionBuffer = buffer;
-
+    boolean fair = conf.getBoolean(OZONE_MANAGER_FAIR_LOCK,
+        OZONE_MANAGER_FAIR_LOCK_DEFAULT);
+    this.lockManager =
+        new LockManager<>(confSrc, fair);
     initialize();
   }
 
@@ -280,11 +294,11 @@ public final class ContainerStateManagerImpl
 
   @Override
   public ContainerInfo getContainer(final ContainerID id) {
-    lock.readLock().lock();
+    lockManager.readLock(id);
     try {
       return containers.getContainerInfo(id);
     } finally {
-      lock.readLock().unlock();
+      lockManager.readUnlock(id);
     }
   }
 
@@ -303,24 +317,30 @@ public final class ContainerStateManagerImpl
 
     lock.writeLock().lock();
     try {
-      if (!containers.contains(containerID)) {
-        ExecutionUtil.create(() -> {
-          transactionBuffer.addToBuffer(containerStore, containerID, container);
-          containers.addContainer(container);
-          if (pipelineManager.containsPipeline(pipelineID)) {
-            pipelineManager.addContainerToPipeline(pipelineID, containerID);
-          } else if (containerInfo.getState().
-              equals(HddsProtos.LifeCycleState.OPEN)) {
-            // Pipeline should exist, but not
-            throw new PipelineNotFoundException();
-          }
-          //recon may receive report of closed container,
-          // no corresponding Pipeline can be synced for scm.
-          // just only add the container.
-        }).onException(() -> {
-          containers.removeContainer(containerID);
-          transactionBuffer.removeFromBuffer(containerStore, containerID);
-        }).execute();
+      lockManager.writeLock(containerID);
+      try {
+        if (!containers.contains(containerID)) {
+          ExecutionUtil.create(() -> {
+            transactionBuffer.addToBuffer(containerStore,
+                containerID, container);
+            containers.addContainer(container);
+            if (pipelineManager.containsPipeline(pipelineID)) {
+              pipelineManager.addContainerToPipeline(pipelineID, containerID);
+            } else if (containerInfo.getState().
+                equals(LifeCycleState.OPEN)) {
+              // Pipeline should exist, but not
+              throw new PipelineNotFoundException();
+            }
+            //recon may receive report of closed container,
+            // no corresponding Pipeline can be synced for scm.
+            // just only add the container.
+          }).onException(() -> {
+            containers.removeContainer(containerID);
+            transactionBuffer.removeFromBuffer(containerStore, containerID);
+          }).execute();
+        }
+      } finally {
+        lockManager.writeUnlock(containerID);
       }
     } finally {
       lock.writeLock().unlock();
@@ -329,11 +349,11 @@ public final class ContainerStateManagerImpl
 
   @Override
   public boolean contains(ContainerID id) {
-    lock.readLock().lock();
+    lockManager.readLock(id);
     try {
       return containers.contains(id);
     } finally {
-      lock.readLock().unlock();
+      lockManager.readUnlock(id);
     }
   }
 
@@ -344,7 +364,7 @@ public final class ContainerStateManagerImpl
     // TODO: Remove the protobuf conversion after fixing ContainerStateMap.
     final ContainerID id = ContainerID.getFromProtobuf(containerID);
 
-    lock.writeLock().lock();
+    lockManager.writeLock(id);
     try {
       if (containers.contains(id)) {
         final ContainerInfo oldInfo = containers.getContainerInfo(id);
@@ -365,52 +385,54 @@ public final class ContainerStateManagerImpl
         }
       }
     } finally {
-      lock.writeLock().unlock();
+      lockManager.writeUnlock(id);
     }
   }
 
 
   @Override
   public Set<ContainerReplica> getContainerReplicas(final ContainerID id) {
-    lock.readLock().lock();
+    lockManager.readLock(id);
     try {
       return containers.getContainerReplicas(id);
     } finally {
-      lock.readLock().unlock();
+      lockManager.readUnlock(id);
     }
   }
 
   @Override
   public void updateContainerReplica(final ContainerID id,
                                      final ContainerReplica replica) {
-    lock.writeLock().lock();
+    lockManager.writeLock(id);
     try {
       containers.updateContainerReplica(id, replica);
     } finally {
-      lock.writeLock().unlock();
+      lockManager.writeUnlock(id);
     }
   }
 
   @Override
   public void removeContainerReplica(final ContainerID id,
                                      final ContainerReplica replica) {
-    lock.writeLock().lock();
+    lockManager.writeLock(id);
     try {
       containers.removeContainerReplica(id,
           replica);
     } finally {
-      lock.writeLock().unlock();
+      lockManager.writeUnlock(id);
     }
   }
 
   @Override
   public void updateDeleteTransactionId(
       final Map<ContainerID, Long> deleteTransactionMap) throws IOException {
-    lock.writeLock().lock();
-    try {
-      // TODO: Refactor this. Error handling is not done.
-      for (Map.Entry<ContainerID, Long> transaction :
-          deleteTransactionMap.entrySet()) {
+
+    // TODO: Refactor this. Error handling is not done.
+    for (Map.Entry<ContainerID, Long> transaction :
+        deleteTransactionMap.entrySet()) {
+      ContainerID containerID = transaction.getKey();
+      try {
+        lockManager.writeLock(containerID);
         final ContainerInfo info = containers.getContainerInfo(
             transaction.getKey());
         if (info == null) {
@@ -420,9 +442,9 @@ public final class ContainerStateManagerImpl
         }
         info.updateDeleteTransactionId(transaction.getValue());
         transactionBuffer.addToBuffer(containerStore, info.containerID(), info);
+      } finally {
+        lockManager.writeUnlock(containerID);
       }
-    } finally {
-      lock.writeLock().unlock();
     }
   }
 
@@ -438,48 +460,49 @@ public final class ContainerStateManagerImpl
     final ContainerID lastID =
         lastUsedMap.getOrDefault(key, containerIDs.first());
 
+
     // There is a small issue here. The first time, we will skip the first
     // container. But in most cases it will not matter.
     NavigableSet<ContainerID> resultSet = containerIDs.tailSet(lastID, false);
     if (resultSet.isEmpty()) {
       resultSet = containerIDs;
     }
+    ContainerInfo selectedContainer = findContainerWithSpace(size, resultSet);
+    if (selectedContainer == null) {
+
+      // If we did not find any space in the tailSet, we need to look for
+      // space in the headset, we need to pass true to deal with the
+      // situation that we have a lone container that has space. That is we
+      // ignored the last used container under the assumption we can find
+      // other containers with space, but if have a single container that is
+      // not true. Hence we need to include the last used container as the
+      // last element in the sorted set.
+
+      resultSet = containerIDs.headSet(lastID, true);
+      selectedContainer = findContainerWithSpace(size, resultSet);
+    }
 
-    lock.readLock().lock();
-    try {
-      ContainerInfo selectedContainer = findContainerWithSpace(size, resultSet);
-      if (selectedContainer == null) {
-
-        // If we did not find any space in the tailSet, we need to look for
-        // space in the headset, we need to pass true to deal with the
-        // situation that we have a lone container that has space. That is we
-        // ignored the last used container under the assumption we can find
-        // other containers with space, but if have a single container that is
-        // not true. Hence we need to include the last used container as the
-        // last element in the sorted set.
-
-        resultSet = containerIDs.headSet(lastID, true);
-        selectedContainer = findContainerWithSpace(size, resultSet);
-      }
-
-      // TODO: cleanup entries in lastUsedMap
-      if (selectedContainer != null) {
-        lastUsedMap.put(key, selectedContainer.containerID());
-      }
-      return selectedContainer;
-    } finally {
-      lock.readLock().unlock();
+    // TODO: cleanup entries in lastUsedMap
+    if (selectedContainer != null) {
+      lastUsedMap.put(key, selectedContainer.containerID());
     }
+    return selectedContainer;
   }
 
   private ContainerInfo findContainerWithSpace(final long size,
-      final NavigableSet<ContainerID> searchSet) {
-    // Get the container with space to meet our request.
+                                               final NavigableSet<ContainerID>
+                                                   searchSet) {
+      // Get the container with space to meet our request.
     for (ContainerID id : searchSet) {
-      final ContainerInfo containerInfo = containers.getContainerInfo(id);
-      if (containerInfo.getUsedBytes() + size <= this.containerSize) {
-        containerInfo.updateLastUsedTime();
-        return containerInfo;
+      try {
+        lockManager.readLock(id);
+        final ContainerInfo containerInfo = containers.getContainerInfo(id);
+        if (containerInfo.getUsedBytes() + size <= this.containerSize) {
+          containerInfo.updateLastUsedTime();
+          return containerInfo;
+        }
+      } finally {
+        lockManager.readUnlock(id);
       }
     }
     return null;
@@ -488,14 +511,19 @@ public final class ContainerStateManagerImpl
 
   public void removeContainer(final HddsProtos.ContainerID id)
       throws IOException {
+    final ContainerID cid = ContainerID.getFromProtobuf(id);
     lock.writeLock().lock();
     try {
-      final ContainerID cid = ContainerID.getFromProtobuf(id);
-      final ContainerInfo containerInfo = containers.getContainerInfo(cid);
-      ExecutionUtil.create(() -> {
-        transactionBuffer.removeFromBuffer(containerStore, cid);
-        containers.removeContainer(cid);
-      }).onException(() -> containerStore.put(cid, containerInfo)).execute();
+      lockManager.writeLock(cid);
+      try {
+        final ContainerInfo containerInfo = containers.getContainerInfo(cid);
+        ExecutionUtil.create(() -> {
+          transactionBuffer.removeFromBuffer(containerStore, cid);
+          containers.removeContainer(cid);
+        }).onException(() -> containerStore.put(cid, containerInfo)).execute();
+      } finally {
+        lockManager.writeUnlock(cid);
+      }
     } finally {
       lock.writeLock().unlock();
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
index 08be71f4ee..1e4eee0d66 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
@@ -24,13 +24,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
 
 /**
  * Each Attribute that we manage for a container is maintained as a map.
@@ -65,7 +64,7 @@ public class ContainerAttribute<T> {
 
   private final Map<T, NavigableSet<ContainerID>> attributeMap;
   private static final NavigableSet<ContainerID> EMPTY_SET =  Collections
-      .unmodifiableNavigableSet(new TreeSet<>());
+      .unmodifiableNavigableSet(new ConcurrentSkipListSet<>());
 
   /**
    * Creates a Container Attribute map from an existing Map.
@@ -80,7 +79,7 @@ public class ContainerAttribute<T> {
    * Create an empty Container Attribute map.
    */
   public ContainerAttribute() {
-    this.attributeMap = new HashMap<>();
+    this.attributeMap = new ConcurrentHashMap<>();
   }
 
   /**
@@ -94,7 +93,8 @@ public class ContainerAttribute<T> {
   public boolean insert(T key, ContainerID value) throws SCMException {
     Preconditions.checkNotNull(key);
     Preconditions.checkNotNull(value);
-    attributeMap.computeIfAbsent(key, any -> new TreeSet<>()).add(value);
+    attributeMap.computeIfAbsent(key, any ->
+        new ConcurrentSkipListSet<>()).add(value);
     return true;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org