You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/09/14 18:36:58 UTC

hadoop git commit: HDDS-429. StorageContainerManager lock optimization. Contributed by Nanda Kumar.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 144a55f0e -> 0c8a43b9e


HDDS-429. StorageContainerManager lock optimization.
Contributed by Nanda Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c8a43b9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c8a43b9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c8a43b9

Branch: refs/heads/trunk
Commit: 0c8a43b9ec77b3ba7b2bb4c8aa863b1deba3bc7b
Parents: 144a55f
Author: Anu Engineer <ae...@apache.org>
Authored: Fri Sep 14 10:08:06 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Fri Sep 14 10:08:06 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hdds/scm/block/BlockManagerImpl.java | 229 ++++++++++---------
 .../scm/container/states/ContainerStateMap.java | 177 ++++++++------
 2 files changed, 226 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8a43b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 82d9a28..e4e33c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.hadoop.hdds.scm.block;
 
-import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -45,8 +44,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .CHILL_MODE_EXCEPTION;
@@ -72,7 +71,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   private final NodeManager nodeManager;
   private final Mapping containerManager;
 
-  private final Lock lock;
+  private final ReadWriteLock lock;
   private final long containerSize;
 
   private final DeletedBlockLog deletedBlockLog;
@@ -108,7 +107,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
             ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
             ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
     rand = new Random();
-    this.lock = new ReentrantLock();
+    this.lock = new ReentrantReadWriteLock();
 
     mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
 
@@ -155,29 +154,22 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
    * @param factor - how many copies needed for this container.
    * @throws IOException
    */
-  private void preAllocateContainers(int count, ReplicationType type,
-      ReplicationFactor factor, String owner)
+  private synchronized void preAllocateContainers(int count,
+      ReplicationType type, ReplicationFactor factor, String owner)
       throws IOException {
-    lock.lock();
-    try {
-      for (int i = 0; i < count; i++) {
-        ContainerWithPipeline containerWithPipeline;
-        try {
-          // TODO: Fix this later when Ratis is made the Default.
-          containerWithPipeline = containerManager.allocateContainer(
-              type, factor, owner);
+    for (int i = 0; i < count; i++) {
+      ContainerWithPipeline containerWithPipeline;
+      try {
+        // TODO: Fix this later when Ratis is made the Default.
+        containerWithPipeline = containerManager.allocateContainer(
+            type, factor, owner);
 
-          if (containerWithPipeline == null) {
-            LOG.warn("Unable to allocate container.");
-            continue;
-          }
-        } catch (IOException ex) {
-          LOG.warn("Unable to allocate container: {}", ex);
-          continue;
+        if (containerWithPipeline == null) {
+          LOG.warn("Unable to allocate container.");
         }
+      } catch (IOException ex) {
+        LOG.warn("Unable to allocate container: {}", ex);
       }
-    } finally {
-      lock.unlock();
     }
   }
 
@@ -208,46 +200,61 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           CHILL_MODE_EXCEPTION);
     }
 
-    lock.lock();
-    try {
-      /*
-               Here is the high level logic.
-
-               1. First we check if there are containers in ALLOCATED state,
-               that is
-                SCM has allocated them in the SCM namespace but the
-                corresponding
-                container has not been created in the Datanode yet. If we
-                have any
-                in that state, we will return that to the client, which allows
-                client to finish creating those containers. This is a sort of
-                 greedy
-                 algorithm, our primary purpose is to get as many containers as
-                 possible.
-
-                2. If there are no allocated containers -- Then we find a Open
-                container that matches that pattern.
-
-                3. If both of them fail, the we will pre-allocate a bunch of
-                conatainers in SCM and try again.
-
-               TODO : Support random picking of two containers from the list.
-                So we
-               can use different kind of policies.
-      */
+    /*
+      Here is the high level logic.
 
-      ContainerWithPipeline containerWithPipeline;
+      1. First we check if there are containers in ALLOCATED state, that is
+         SCM has allocated them in the SCM namespace but the corresponding
+         container has not been created in the Datanode yet. If we have any in
+         that state, we will return that to the client, which allows client to
+         finish creating those containers. This is a sort of greedy algorithm,
+         our primary purpose is to get as many containers as possible.
 
-      // Look for ALLOCATED container that matches all other parameters.
-      containerWithPipeline = containerManager
-          .getMatchingContainerWithPipeline(size, owner, type, factor,
-              HddsProtos.LifeCycleState.ALLOCATED);
-      if (containerWithPipeline != null) {
-        containerManager.updateContainerState(
-            containerWithPipeline.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATE);
-        return newBlock(containerWithPipeline,
-            HddsProtos.LifeCycleState.ALLOCATED);
+      2. If there are no allocated containers -- Then we find a Open container
+         that matches that pattern.
+
+      3. If both of them fail, the we will pre-allocate a bunch of containers
+         in SCM and try again.
+
+      TODO : Support random picking of two containers from the list. So we can
+             use different kind of policies.
+    */
+
+    ContainerWithPipeline containerWithPipeline;
+
+    lock.readLock().lock();
+    try {
+      // This is to optimize performance, if the below condition is evaluated
+      // to false, then we can be sure that there are no containers in
+      // ALLOCATED state.
+      // This can result in false positive, but it will never be false negative.
+      // How can this result in false positive? We check if there are any
+      // containers in ALLOCATED state, this check doesn't care about the
+      // USER of the containers. So there might be cases where a different
+      // USER has few containers in ALLOCATED state, which will result in
+      // false positive.
+      if (!containerManager.getStateManager().getContainerStateMap()
+          .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
+          .isEmpty()) {
+        // Since the above check can result in false positive, we have to do
+        // the actual check and find out if there are containers in ALLOCATED
+        // state matching our criteria.
+        synchronized (this) {
+          // Using containers from ALLOCATED state should be done within
+          // synchronized block (or) write lock. Since we already hold a
+          // read lock, we will end up in deadlock situation if we take
+          // write lock here.
+          containerWithPipeline = containerManager
+              .getMatchingContainerWithPipeline(size, owner, type, factor,
+                  HddsProtos.LifeCycleState.ALLOCATED);
+          if (containerWithPipeline != null) {
+            containerManager.updateContainerState(
+                containerWithPipeline.getContainerInfo().getContainerID(),
+                HddsProtos.LifeCycleEvent.CREATE);
+            return newBlock(containerWithPipeline,
+                HddsProtos.LifeCycleState.ALLOCATED);
+          }
+        }
       }
 
       // Since we found no allocated containers that match our criteria, let us
@@ -263,20 +270,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
       // that most of our containers are full or we have not allocated
       // containers of the type and replication factor. So let us go and
       // allocate some.
-      preAllocateContainers(containerProvisionBatchSize, type, factor, owner);
 
-      // Since we just allocated a set of containers this should work
-      containerWithPipeline = containerManager
-          .getMatchingContainerWithPipeline(size, owner, type, factor,
+      // Even though we have already checked the containers in ALLOCATED
+      // state, we have to check again as we only hold a read lock.
+      // Some other thread might have pre-allocated container in meantime.
+      synchronized (this) {
+        if (!containerManager.getStateManager().getContainerStateMap()
+            .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
+            .isEmpty()) {
+          containerWithPipeline = containerManager
+              .getMatchingContainerWithPipeline(size, owner, type, factor,
+                  HddsProtos.LifeCycleState.ALLOCATED);
+        }
+        if (containerWithPipeline == null) {
+          preAllocateContainers(containerProvisionBatchSize,
+              type, factor, owner);
+          containerWithPipeline = containerManager
+              .getMatchingContainerWithPipeline(size, owner, type, factor,
+                  HddsProtos.LifeCycleState.ALLOCATED);
+        }
+
+        if (containerWithPipeline != null) {
+          containerManager.updateContainerState(
+              containerWithPipeline.getContainerInfo().getContainerID(),
+              HddsProtos.LifeCycleEvent.CREATE);
+          return newBlock(containerWithPipeline,
               HddsProtos.LifeCycleState.ALLOCATED);
-      if (containerWithPipeline != null) {
-        containerManager.updateContainerState(
-            containerWithPipeline.getContainerInfo().getContainerID(),
-            HddsProtos.LifeCycleEvent.CREATE);
-        return newBlock(containerWithPipeline,
-            HddsProtos.LifeCycleState.ALLOCATED);
+        }
       }
-
       // we have tried all strategies we know and but somehow we are not able
       // to get a container for this block. Log that info and return a null.
       LOG.error(
@@ -286,19 +307,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           type,
           factor);
       return null;
-    } finally {
-      lock.unlock();
-    }
-  }
 
-  private String getChannelName(ReplicationType type) {
-    switch (type) {
-    case RATIS:
-      return "RA" + UUID.randomUUID().toString().substring(3);
-    case STAND_ALONE:
-      return "SA" + UUID.randomUUID().toString().substring(3);
-    default:
-      return "RA" + UUID.randomUUID().toString().substring(3);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -353,40 +364,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           CHILL_MODE_EXCEPTION);
     }
 
-    lock.lock();
     LOG.info("Deleting blocks {}", StringUtils.join(",", blockIDs));
     Map<Long, List<Long>> containerBlocks = new HashMap<>();
     // TODO: track the block size info so that we can reclaim the container
     // TODO: used space when the block is deleted.
-    try {
-      for (BlockID block : blockIDs) {
-        // Merge blocks to a container to blocks mapping,
-        // prepare to persist this info to the deletedBlocksLog.
-        long containerID = block.getContainerID();
-        if (containerBlocks.containsKey(containerID)) {
-          containerBlocks.get(containerID).add(block.getLocalID());
-        } else {
-          List<Long> item = new ArrayList<>();
-          item.add(block.getLocalID());
-          containerBlocks.put(containerID, item);
-        }
+    for (BlockID block : blockIDs) {
+      // Merge blocks to a container to blocks mapping,
+      // prepare to persist this info to the deletedBlocksLog.
+      long containerID = block.getContainerID();
+      if (containerBlocks.containsKey(containerID)) {
+        containerBlocks.get(containerID).add(block.getLocalID());
+      } else {
+        List<Long> item = new ArrayList<>();
+        item.add(block.getLocalID());
+        containerBlocks.put(containerID, item);
       }
+    }
 
-      try {
-        deletedBlockLog.addTransactions(containerBlocks);
-      } catch (IOException e) {
-        throw new IOException(
-            "Skip writing the deleted blocks info to"
-                + " the delLog because addTransaction fails. Batch skipped: "
-                + StringUtils.join(",", blockIDs),
-            e);
-      }
-      // TODO: Container report handling of the deleted blocks:
-      // Remove tombstone and update open container usage.
-      // We will revisit this when the closed container replication is done.
-    } finally {
-      lock.unlock();
+    try {
+      deletedBlockLog.addTransactions(containerBlocks);
+    } catch (IOException e) {
+      throw new IOException(
+          "Skip writing the deleted blocks info to"
+              + " the delLog because addTransaction fails. Batch skipped: "
+              + StringUtils.join(",", blockIDs), e);
     }
+    // TODO: Container report handling of the deleted blocks:
+    // Remove tombstone and update open container usage.
+    // We will revisit this when the closed container replication is done.
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8a43b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 4d34cb7..8cb65cf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +39,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .CONTAINER_EXISTS;
@@ -108,7 +109,7 @@ public class ContainerStateMap {
   // Container State Map lock should be held before calling into
   // Update ContainerAttributes. The consistency of ContainerAttributes is
   // protected by this lock.
-  private final AutoCloseableLock autoLock;
+  private final ReadWriteLock lock;
 
   /**
    * Create a ContainerStateMap.
@@ -120,7 +121,7 @@ public class ContainerStateMap {
     typeMap = new ContainerAttribute<>();
     openPipelineMap = new ContainerAttribute<>();
     containerMap = new HashMap<>();
-    autoLock = new AutoCloseableLock();
+    lock = new ReentrantReadWriteLock();
     contReplicaMap = new HashMap<>();
 //        new InstrumentedLock(getClass().getName(), LOG,
 //            new ReentrantLock(),
@@ -140,7 +141,8 @@ public class ContainerStateMap {
     Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0,
         "ExpectedReplicaCount should be greater than 0");
 
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.writeLock().lock();
+    try {
       ContainerID id = ContainerID.valueof(info.getContainerID());
       if (containerMap.putIfAbsent(id, info) != null) {
         LOG.debug("Duplicate container ID detected. {}", id);
@@ -157,6 +159,8 @@ public class ContainerStateMap {
         openPipelineMap.insert(info.getPipelineID(), id);
       }
       LOG.trace("Created container with {} successfully.", id);
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
@@ -177,8 +181,13 @@ public class ContainerStateMap {
    * @return container info, if found.
    */
   public ContainerInfo getContainerInfo(long containerID) {
-    ContainerID id = new ContainerID(containerID);
-    return containerMap.get(id);
+    lock.readLock().lock();
+    try {
+      ContainerID id = new ContainerID(containerID);
+      return containerMap.get(id);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   /**
@@ -191,11 +200,14 @@ public class ContainerStateMap {
   public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
       throws SCMException {
     Preconditions.checkNotNull(containerID);
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.readLock().lock();
+    try {
       if (contReplicaMap.containsKey(containerID)) {
         return Collections
             .unmodifiableSet(contReplicaMap.get(containerID));
       }
+    } finally {
+      lock.readLock().unlock();
     }
     throw new SCMException(
         "No entry exist for containerId: " + containerID + " in replica map.",
@@ -213,8 +225,8 @@ public class ContainerStateMap {
   public void addContainerReplica(ContainerID containerID,
       DatanodeDetails... dnList) {
     Preconditions.checkNotNull(containerID);
-    // Take lock to avoid race condition around insertion.
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.writeLock().lock();
+    try {
       for (DatanodeDetails dn : dnList) {
         Preconditions.checkNotNull(dn);
         if (contReplicaMap.containsKey(containerID)) {
@@ -228,6 +240,8 @@ public class ContainerStateMap {
           contReplicaMap.put(containerID, dnSet);
         }
       }
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
@@ -243,11 +257,13 @@ public class ContainerStateMap {
     Preconditions.checkNotNull(containerID);
     Preconditions.checkNotNull(dn);
 
-    // Take lock to avoid race condition.
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.writeLock().lock();
+    try {
       if (contReplicaMap.containsKey(containerID)) {
         return contReplicaMap.get(containerID).remove(dn);
       }
+    } finally {
+      lock.writeLock().unlock();
     }
     throw new SCMException(
         "No entry exist for containerId: " + containerID + " in replica map.",
@@ -265,8 +281,11 @@ public class ContainerStateMap {
    * @return - Map
    */
   public Map<ContainerID, ContainerInfo> getContainerMap() {
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.readLock().lock();
+    try {
       return Collections.unmodifiableMap(containerMap);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -277,7 +296,8 @@ public class ContainerStateMap {
   public void updateContainerInfo(ContainerInfo info) throws SCMException {
     Preconditions.checkNotNull(info);
     ContainerInfo currentInfo = null;
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.writeLock().lock();
+    try {
       currentInfo = containerMap.get(
           ContainerID.valueof(info.getContainerID()));
 
@@ -285,6 +305,8 @@ public class ContainerStateMap {
         throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
       }
       containerMap.put(info.containerID(), info);
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
@@ -304,51 +326,56 @@ public class ContainerStateMap {
     ContainerID id = new ContainerID(info.getContainerID());
     ContainerInfo currentInfo = null;
 
-    try (AutoCloseableLock lock = autoLock.acquire()) {
-      currentInfo = containerMap.get(id);
+    lock.writeLock().lock();
+    try {
+      try {
+        currentInfo = containerMap.get(id);
 
-      if (currentInfo == null) {
-        throw new
-            SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+        if (currentInfo == null) {
+          throw new
+              SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+        }
+        // We are updating two places before this update is done, these can
+        // fail independently, since the code needs to handle it.
+
+        // We update the attribute map, if that fails it will throw an
+        // exception, so no issues, if we are successful, we keep track of the
+        // fact that we have updated the lifecycle state in the map, and update
+        // the container state. If this second update fails, we will attempt to
+        // roll back the earlier change we did. If the rollback fails, we can
+        // be in an inconsistent state,
+
+        info.setState(newState);
+        containerMap.put(id, info);
+        lifeCycleStateMap.update(currentState, newState, id);
+        LOG.trace("Updated the container {} to new state. Old = {}, new = " +
+            "{}", id, currentState, newState);
+      } catch (SCMException ex) {
+        LOG.error("Unable to update the container state. {}", ex);
+        // we need to revert the change in this attribute since we are not
+        // able to update the hash table.
+        LOG.info("Reverting the update to lifecycle state. Moving back to " +
+                "old state. Old = {}, Attempted state = {}", currentState,
+            newState);
+
+        containerMap.put(id, currentInfo);
+
+        // if this line throws, the state map can be in an inconsistent
+        // state, since we will have modified the attribute by the
+        // container state will not in sync since we were not able to put
+        // that into the hash table.
+        lifeCycleStateMap.update(newState, currentState, id);
+
+        throw new SCMException("Updating the container map failed.", ex,
+            FAILED_TO_CHANGE_CONTAINER_STATE);
       }
-      // We are updating two places before this update is done, these can
-      // fail independently, since the code needs to handle it.
-
-      // We update the attribute map, if that fails it will throw an exception,
-      // so no issues, if we are successful, we keep track of the fact that we
-      // have updated the lifecycle state in the map, and update the container
-      // state. If this second update fails, we will attempt to roll back the
-      // earlier change we did. If the rollback fails, we can be in an
-      // inconsistent state,
-
-      info.setState(newState);
-      containerMap.put(id, info);
-      lifeCycleStateMap.update(currentState, newState, id);
-      LOG.trace("Updated the container {} to new state. Old = {}, new = " +
-          "{}", id, currentState, newState);
-    } catch (SCMException ex) {
-      LOG.error("Unable to update the container state. {}", ex);
-      // we need to revert the change in this attribute since we are not
-      // able to update the hash table.
-      LOG.info("Reverting the update to lifecycle state. Moving back to " +
-              "old state. Old = {}, Attempted state = {}", currentState,
-          newState);
-
-      containerMap.put(id, currentInfo);
-
-      // if this line throws, the state map can be in an inconsistent
-      // state, since we will have modified the attribute by the
-      // container state will not in sync since we were not able to put
-      // that into the hash table.
-      lifeCycleStateMap.update(newState, currentState, id);
-
-      throw new SCMException("Updating the container map failed.", ex,
-          FAILED_TO_CHANGE_CONTAINER_STATE);
-    }
-    // In case the container is set to closed state, it needs to be removed from
-    // the pipeline Map.
-    if (!info.isContainerOpen()) {
-      openPipelineMap.remove(info.getPipelineID(), id);
+      // In case the container is set to closed state, it needs to be removed
+      // from the pipeline Map.
+      if (!info.isContainerOpen()) {
+        openPipelineMap.remove(info.getPipelineID(), id);
+      }
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
@@ -360,9 +387,11 @@ public class ContainerStateMap {
    */
   NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
     Preconditions.checkNotNull(ownerName);
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.readLock().lock();
+    try {
       return ownerMap.getCollection(ownerName);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -374,9 +403,11 @@ public class ContainerStateMap {
    */
   NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
     Preconditions.checkNotNull(type);
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.readLock().lock();
+    try {
       return typeMap.getCollection(type);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -389,9 +420,11 @@ public class ContainerStateMap {
   public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(
       PipelineID pipelineID) {
     Preconditions.checkNotNull(pipelineID);
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.readLock().lock();
+    try {
       return openPipelineMap.getCollection(pipelineID);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -403,9 +436,11 @@ public class ContainerStateMap {
    */
   NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
     Preconditions.checkNotNull(factor);
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.readLock().lock();
+    try {
       return factorMap.getCollection(factor);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -415,11 +450,14 @@ public class ContainerStateMap {
    * @param state - State - Open, Closed etc.
    * @return List of containers by state.
    */
-  NavigableSet<ContainerID> getContainerIDsByState(LifeCycleState state) {
+  public NavigableSet<ContainerID> getContainerIDsByState(
+      LifeCycleState state) {
     Preconditions.checkNotNull(state);
-
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.readLock().lock();
+    try {
       return lifeCycleStateMap.getCollection(state);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -441,7 +479,8 @@ public class ContainerStateMap {
     Preconditions.checkNotNull(factor, "Factor cannot be null");
     Preconditions.checkNotNull(type, "Type cannot be null");
 
-    try (AutoCloseableLock lock = autoLock.acquire()) {
+    lock.readLock().lock();
+    try {
 
       // If we cannot meet any one condition we return EMPTY_SET immediately.
       // Since when we intersect these sets, the result will be empty if any
@@ -479,6 +518,8 @@ public class ContainerStateMap {
         currentSet = intersectSets(currentSet, sets[x]);
       }
       return currentSet;
+    } finally {
+      lock.readLock().unlock();
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org