You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/09/14 18:36:58 UTC
hadoop git commit: HDDS-429. StorageContainerManager lock
optimization. Contributed by Nanda Kumar.
Repository: hadoop
Updated Branches:
refs/heads/trunk 144a55f0e -> 0c8a43b9e
HDDS-429. StorageContainerManager lock optimization.
Contributed by Nanda Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c8a43b9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c8a43b9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c8a43b9
Branch: refs/heads/trunk
Commit: 0c8a43b9ec77b3ba7b2bb4c8aa863b1deba3bc7b
Parents: 144a55f
Author: Anu Engineer <ae...@apache.org>
Authored: Fri Sep 14 10:08:06 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Fri Sep 14 10:08:06 2018 -0700
----------------------------------------------------------------------
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 229 ++++++++++---------
.../scm/container/states/ContainerStateMap.java | 177 ++++++++------
2 files changed, 226 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8a43b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 82d9a28..e4e33c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdds.scm.block;
-import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -45,8 +44,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.CHILL_MODE_EXCEPTION;
@@ -72,7 +71,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private final NodeManager nodeManager;
private final Mapping containerManager;
- private final Lock lock;
+ private final ReadWriteLock lock;
private final long containerSize;
private final DeletedBlockLog deletedBlockLog;
@@ -108,7 +107,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
rand = new Random();
- this.lock = new ReentrantLock();
+ this.lock = new ReentrantReadWriteLock();
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
@@ -155,29 +154,22 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
* @param factor - how many copies needed for this container.
* @throws IOException
*/
- private void preAllocateContainers(int count, ReplicationType type,
- ReplicationFactor factor, String owner)
+ private synchronized void preAllocateContainers(int count,
+ ReplicationType type, ReplicationFactor factor, String owner)
throws IOException {
- lock.lock();
- try {
- for (int i = 0; i < count; i++) {
- ContainerWithPipeline containerWithPipeline;
- try {
- // TODO: Fix this later when Ratis is made the Default.
- containerWithPipeline = containerManager.allocateContainer(
- type, factor, owner);
+ for (int i = 0; i < count; i++) {
+ ContainerWithPipeline containerWithPipeline;
+ try {
+ // TODO: Fix this later when Ratis is made the Default.
+ containerWithPipeline = containerManager.allocateContainer(
+ type, factor, owner);
- if (containerWithPipeline == null) {
- LOG.warn("Unable to allocate container.");
- continue;
- }
- } catch (IOException ex) {
- LOG.warn("Unable to allocate container: {}", ex);
- continue;
+ if (containerWithPipeline == null) {
+ LOG.warn("Unable to allocate container.");
}
+ } catch (IOException ex) {
+ LOG.warn("Unable to allocate container: {}", ex);
}
- } finally {
- lock.unlock();
}
}
@@ -208,46 +200,61 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
CHILL_MODE_EXCEPTION);
}
- lock.lock();
- try {
- /*
- Here is the high level logic.
-
- 1. First we check if there are containers in ALLOCATED state,
- that is
- SCM has allocated them in the SCM namespace but the
- corresponding
- container has not been created in the Datanode yet. If we
- have any
- in that state, we will return that to the client, which allows
- client to finish creating those containers. This is a sort of
- greedy
- algorithm, our primary purpose is to get as many containers as
- possible.
-
- 2. If there are no allocated containers -- Then we find a Open
- container that matches that pattern.
-
- 3. If both of them fail, the we will pre-allocate a bunch of
- conatainers in SCM and try again.
-
- TODO : Support random picking of two containers from the list.
- So we
- can use different kind of policies.
- */
+ /*
+ Here is the high level logic.
- ContainerWithPipeline containerWithPipeline;
+ 1. First we check if there are containers in ALLOCATED state, that is
+ SCM has allocated them in the SCM namespace but the corresponding
+ container has not been created in the Datanode yet. If we have any in
+ that state, we will return that to the client, which allows client to
+ finish creating those containers. This is a sort of greedy algorithm,
+ our primary purpose is to get as many containers as possible.
- // Look for ALLOCATED container that matches all other parameters.
- containerWithPipeline = containerManager
- .getMatchingContainerWithPipeline(size, owner, type, factor,
- HddsProtos.LifeCycleState.ALLOCATED);
- if (containerWithPipeline != null) {
- containerManager.updateContainerState(
- containerWithPipeline.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- return newBlock(containerWithPipeline,
- HddsProtos.LifeCycleState.ALLOCATED);
+ 2. If there are no allocated containers -- Then we find a Open container
+ that matches that pattern.
+
+ 3. If both of them fail, the we will pre-allocate a bunch of containers
+ in SCM and try again.
+
+ TODO : Support random picking of two containers from the list. So we can
+ use different kind of policies.
+ */
+
+ ContainerWithPipeline containerWithPipeline;
+
+ lock.readLock().lock();
+ try {
+ // This is to optimize performance, if the below condition is evaluated
+ // to false, then we can be sure that there are no containers in
+ // ALLOCATED state.
+ // This can result in false positive, but it will never be false negative.
+ // How can this result in false positive? We check if there are any
+ // containers in ALLOCATED state, this check doesn't care about the
+ // USER of the containers. So there might be cases where a different
+ // USER has few containers in ALLOCATED state, which will result in
+ // false positive.
+ if (!containerManager.getStateManager().getContainerStateMap()
+ .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
+ .isEmpty()) {
+ // Since the above check can result in false positive, we have to do
+ // the actual check and find out if there are containers in ALLOCATED
+ // state matching our criteria.
+ synchronized (this) {
+ // Using containers from ALLOCATED state should be done within
+ // synchronized block (or) write lock. Since we already hold a
+ // read lock, we will end up in deadlock situation if we take
+ // write lock here.
+ containerWithPipeline = containerManager
+ .getMatchingContainerWithPipeline(size, owner, type, factor,
+ HddsProtos.LifeCycleState.ALLOCATED);
+ if (containerWithPipeline != null) {
+ containerManager.updateContainerState(
+ containerWithPipeline.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ return newBlock(containerWithPipeline,
+ HddsProtos.LifeCycleState.ALLOCATED);
+ }
+ }
}
// Since we found no allocated containers that match our criteria, let us
@@ -263,20 +270,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// that most of our containers are full or we have not allocated
// containers of the type and replication factor. So let us go and
// allocate some.
- preAllocateContainers(containerProvisionBatchSize, type, factor, owner);
- // Since we just allocated a set of containers this should work
- containerWithPipeline = containerManager
- .getMatchingContainerWithPipeline(size, owner, type, factor,
+ // Even though we have already checked the containers in ALLOCATED
+ // state, we have to check again as we only hold a read lock.
+ // Some other thread might have pre-allocated container in meantime.
+ synchronized (this) {
+ if (!containerManager.getStateManager().getContainerStateMap()
+ .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED)
+ .isEmpty()) {
+ containerWithPipeline = containerManager
+ .getMatchingContainerWithPipeline(size, owner, type, factor,
+ HddsProtos.LifeCycleState.ALLOCATED);
+ }
+ if (containerWithPipeline == null) {
+ preAllocateContainers(containerProvisionBatchSize,
+ type, factor, owner);
+ containerWithPipeline = containerManager
+ .getMatchingContainerWithPipeline(size, owner, type, factor,
+ HddsProtos.LifeCycleState.ALLOCATED);
+ }
+
+ if (containerWithPipeline != null) {
+ containerManager.updateContainerState(
+ containerWithPipeline.getContainerInfo().getContainerID(),
+ HddsProtos.LifeCycleEvent.CREATE);
+ return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
- if (containerWithPipeline != null) {
- containerManager.updateContainerState(
- containerWithPipeline.getContainerInfo().getContainerID(),
- HddsProtos.LifeCycleEvent.CREATE);
- return newBlock(containerWithPipeline,
- HddsProtos.LifeCycleState.ALLOCATED);
+ }
}
-
// we have tried all strategies we know and but somehow we are not able
// to get a container for this block. Log that info and return a null.
LOG.error(
@@ -286,19 +307,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
type,
factor);
return null;
- } finally {
- lock.unlock();
- }
- }
- private String getChannelName(ReplicationType type) {
- switch (type) {
- case RATIS:
- return "RA" + UUID.randomUUID().toString().substring(3);
- case STAND_ALONE:
- return "SA" + UUID.randomUUID().toString().substring(3);
- default:
- return "RA" + UUID.randomUUID().toString().substring(3);
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -353,40 +364,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
CHILL_MODE_EXCEPTION);
}
- lock.lock();
LOG.info("Deleting blocks {}", StringUtils.join(",", blockIDs));
Map<Long, List<Long>> containerBlocks = new HashMap<>();
// TODO: track the block size info so that we can reclaim the container
// TODO: used space when the block is deleted.
- try {
- for (BlockID block : blockIDs) {
- // Merge blocks to a container to blocks mapping,
- // prepare to persist this info to the deletedBlocksLog.
- long containerID = block.getContainerID();
- if (containerBlocks.containsKey(containerID)) {
- containerBlocks.get(containerID).add(block.getLocalID());
- } else {
- List<Long> item = new ArrayList<>();
- item.add(block.getLocalID());
- containerBlocks.put(containerID, item);
- }
+ for (BlockID block : blockIDs) {
+ // Merge blocks to a container to blocks mapping,
+ // prepare to persist this info to the deletedBlocksLog.
+ long containerID = block.getContainerID();
+ if (containerBlocks.containsKey(containerID)) {
+ containerBlocks.get(containerID).add(block.getLocalID());
+ } else {
+ List<Long> item = new ArrayList<>();
+ item.add(block.getLocalID());
+ containerBlocks.put(containerID, item);
}
+ }
- try {
- deletedBlockLog.addTransactions(containerBlocks);
- } catch (IOException e) {
- throw new IOException(
- "Skip writing the deleted blocks info to"
- + " the delLog because addTransaction fails. Batch skipped: "
- + StringUtils.join(",", blockIDs),
- e);
- }
- // TODO: Container report handling of the deleted blocks:
- // Remove tombstone and update open container usage.
- // We will revisit this when the closed container replication is done.
- } finally {
- lock.unlock();
+ try {
+ deletedBlockLog.addTransactions(containerBlocks);
+ } catch (IOException e) {
+ throw new IOException(
+ "Skip writing the deleted blocks info to"
+ + " the delLog because addTransaction fails. Batch skipped: "
+ + StringUtils.join(",", blockIDs), e);
}
+ // TODO: Container report handling of the deleted blocks:
+ // Remove tombstone and update open container usage.
+ // We will revisit this when the closed container replication is done.
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c8a43b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 4d34cb7..8cb65cf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
-import org.apache.hadoop.util.AutoCloseableLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +39,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.CONTAINER_EXISTS;
@@ -108,7 +109,7 @@ public class ContainerStateMap {
// Container State Map lock should be held before calling into
// Update ContainerAttributes. The consistency of ContainerAttributes is
// protected by this lock.
- private final AutoCloseableLock autoLock;
+ private final ReadWriteLock lock;
/**
* Create a ContainerStateMap.
@@ -120,7 +121,7 @@ public class ContainerStateMap {
typeMap = new ContainerAttribute<>();
openPipelineMap = new ContainerAttribute<>();
containerMap = new HashMap<>();
- autoLock = new AutoCloseableLock();
+ lock = new ReentrantReadWriteLock();
contReplicaMap = new HashMap<>();
// new InstrumentedLock(getClass().getName(), LOG,
// new ReentrantLock(),
@@ -140,7 +141,8 @@ public class ContainerStateMap {
Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0,
"ExpectedReplicaCount should be greater than 0");
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.writeLock().lock();
+ try {
ContainerID id = ContainerID.valueof(info.getContainerID());
if (containerMap.putIfAbsent(id, info) != null) {
LOG.debug("Duplicate container ID detected. {}", id);
@@ -157,6 +159,8 @@ public class ContainerStateMap {
openPipelineMap.insert(info.getPipelineID(), id);
}
LOG.trace("Created container with {} successfully.", id);
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -177,8 +181,13 @@ public class ContainerStateMap {
* @return container info, if found.
*/
public ContainerInfo getContainerInfo(long containerID) {
- ContainerID id = new ContainerID(containerID);
- return containerMap.get(id);
+ lock.readLock().lock();
+ try {
+ ContainerID id = new ContainerID(containerID);
+ return containerMap.get(id);
+ } finally {
+ lock.readLock().unlock();
+ }
}
/**
@@ -191,11 +200,14 @@ public class ContainerStateMap {
public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
throws SCMException {
Preconditions.checkNotNull(containerID);
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.readLock().lock();
+ try {
if (contReplicaMap.containsKey(containerID)) {
return Collections
.unmodifiableSet(contReplicaMap.get(containerID));
}
+ } finally {
+ lock.readLock().unlock();
}
throw new SCMException(
"No entry exist for containerId: " + containerID + " in replica map.",
@@ -213,8 +225,8 @@ public class ContainerStateMap {
public void addContainerReplica(ContainerID containerID,
DatanodeDetails... dnList) {
Preconditions.checkNotNull(containerID);
- // Take lock to avoid race condition around insertion.
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.writeLock().lock();
+ try {
for (DatanodeDetails dn : dnList) {
Preconditions.checkNotNull(dn);
if (contReplicaMap.containsKey(containerID)) {
@@ -228,6 +240,8 @@ public class ContainerStateMap {
contReplicaMap.put(containerID, dnSet);
}
}
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -243,11 +257,13 @@ public class ContainerStateMap {
Preconditions.checkNotNull(containerID);
Preconditions.checkNotNull(dn);
- // Take lock to avoid race condition.
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.writeLock().lock();
+ try {
if (contReplicaMap.containsKey(containerID)) {
return contReplicaMap.get(containerID).remove(dn);
}
+ } finally {
+ lock.writeLock().unlock();
}
throw new SCMException(
"No entry exist for containerId: " + containerID + " in replica map.",
@@ -265,8 +281,11 @@ public class ContainerStateMap {
* @return - Map
*/
public Map<ContainerID, ContainerInfo> getContainerMap() {
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.readLock().lock();
+ try {
return Collections.unmodifiableMap(containerMap);
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -277,7 +296,8 @@ public class ContainerStateMap {
public void updateContainerInfo(ContainerInfo info) throws SCMException {
Preconditions.checkNotNull(info);
ContainerInfo currentInfo = null;
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.writeLock().lock();
+ try {
currentInfo = containerMap.get(
ContainerID.valueof(info.getContainerID()));
@@ -285,6 +305,8 @@ public class ContainerStateMap {
throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
}
containerMap.put(info.containerID(), info);
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -304,51 +326,56 @@ public class ContainerStateMap {
ContainerID id = new ContainerID(info.getContainerID());
ContainerInfo currentInfo = null;
- try (AutoCloseableLock lock = autoLock.acquire()) {
- currentInfo = containerMap.get(id);
+ lock.writeLock().lock();
+ try {
+ try {
+ currentInfo = containerMap.get(id);
- if (currentInfo == null) {
- throw new
- SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+ if (currentInfo == null) {
+ throw new
+ SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+ }
+ // We are updating two places before this update is done, these can
+ // fail independently, since the code needs to handle it.
+
+ // We update the attribute map, if that fails it will throw an
+ // exception, so no issues, if we are successful, we keep track of the
+ // fact that we have updated the lifecycle state in the map, and update
+ // the container state. If this second update fails, we will attempt to
+ // roll back the earlier change we did. If the rollback fails, we can
+ // be in an inconsistent state,
+
+ info.setState(newState);
+ containerMap.put(id, info);
+ lifeCycleStateMap.update(currentState, newState, id);
+ LOG.trace("Updated the container {} to new state. Old = {}, new = " +
+ "{}", id, currentState, newState);
+ } catch (SCMException ex) {
+ LOG.error("Unable to update the container state. {}", ex);
+ // we need to revert the change in this attribute since we are not
+ // able to update the hash table.
+ LOG.info("Reverting the update to lifecycle state. Moving back to " +
+ "old state. Old = {}, Attempted state = {}", currentState,
+ newState);
+
+ containerMap.put(id, currentInfo);
+
+ // if this line throws, the state map can be in an inconsistent
+ // state, since we will have modified the attribute by the
+ // container state will not in sync since we were not able to put
+ // that into the hash table.
+ lifeCycleStateMap.update(newState, currentState, id);
+
+ throw new SCMException("Updating the container map failed.", ex,
+ FAILED_TO_CHANGE_CONTAINER_STATE);
}
- // We are updating two places before this update is done, these can
- // fail independently, since the code needs to handle it.
-
- // We update the attribute map, if that fails it will throw an exception,
- // so no issues, if we are successful, we keep track of the fact that we
- // have updated the lifecycle state in the map, and update the container
- // state. If this second update fails, we will attempt to roll back the
- // earlier change we did. If the rollback fails, we can be in an
- // inconsistent state,
-
- info.setState(newState);
- containerMap.put(id, info);
- lifeCycleStateMap.update(currentState, newState, id);
- LOG.trace("Updated the container {} to new state. Old = {}, new = " +
- "{}", id, currentState, newState);
- } catch (SCMException ex) {
- LOG.error("Unable to update the container state. {}", ex);
- // we need to revert the change in this attribute since we are not
- // able to update the hash table.
- LOG.info("Reverting the update to lifecycle state. Moving back to " +
- "old state. Old = {}, Attempted state = {}", currentState,
- newState);
-
- containerMap.put(id, currentInfo);
-
- // if this line throws, the state map can be in an inconsistent
- // state, since we will have modified the attribute by the
- // container state will not in sync since we were not able to put
- // that into the hash table.
- lifeCycleStateMap.update(newState, currentState, id);
-
- throw new SCMException("Updating the container map failed.", ex,
- FAILED_TO_CHANGE_CONTAINER_STATE);
- }
- // In case the container is set to closed state, it needs to be removed from
- // the pipeline Map.
- if (!info.isContainerOpen()) {
- openPipelineMap.remove(info.getPipelineID(), id);
+ // In case the container is set to closed state, it needs to be removed
+ // from the pipeline Map.
+ if (!info.isContainerOpen()) {
+ openPipelineMap.remove(info.getPipelineID(), id);
+ }
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -360,9 +387,11 @@ public class ContainerStateMap {
*/
NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
Preconditions.checkNotNull(ownerName);
-
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.readLock().lock();
+ try {
return ownerMap.getCollection(ownerName);
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -374,9 +403,11 @@ public class ContainerStateMap {
*/
NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
Preconditions.checkNotNull(type);
-
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.readLock().lock();
+ try {
return typeMap.getCollection(type);
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -389,9 +420,11 @@ public class ContainerStateMap {
public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(
PipelineID pipelineID) {
Preconditions.checkNotNull(pipelineID);
-
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.readLock().lock();
+ try {
return openPipelineMap.getCollection(pipelineID);
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -403,9 +436,11 @@ public class ContainerStateMap {
*/
NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
Preconditions.checkNotNull(factor);
-
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.readLock().lock();
+ try {
return factorMap.getCollection(factor);
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -415,11 +450,14 @@ public class ContainerStateMap {
* @param state - State - Open, Closed etc.
* @return List of containers by state.
*/
- NavigableSet<ContainerID> getContainerIDsByState(LifeCycleState state) {
+ public NavigableSet<ContainerID> getContainerIDsByState(
+ LifeCycleState state) {
Preconditions.checkNotNull(state);
-
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.readLock().lock();
+ try {
return lifeCycleStateMap.getCollection(state);
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -441,7 +479,8 @@ public class ContainerStateMap {
Preconditions.checkNotNull(factor, "Factor cannot be null");
Preconditions.checkNotNull(type, "Type cannot be null");
- try (AutoCloseableLock lock = autoLock.acquire()) {
+ lock.readLock().lock();
+ try {
// If we cannot meet any one condition we return EMPTY_SET immediately.
// Since when we intersect these sets, the result will be empty if any
@@ -479,6 +518,8 @@ public class ContainerStateMap {
currentSet = intersectSets(currentSet, sets[x]);
}
return currentSet;
+ } finally {
+ lock.readLock().unlock();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org