You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2019/06/28 21:05:29 UTC

[hadoop] branch trunk updated: HDDS-1672. Improve locking in OzoneManager. (#1016)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 49c5e8a  HDDS-1672. Improve locking in OzoneManager. (#1016)
49c5e8a is described below

commit 49c5e8ac249981b533763d1523e72872748e3f79
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Fri Jun 28 14:05:19 2019 -0700

    HDDS-1672. Improve locking in OzoneManager. (#1016)
---
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   1 +
 .../apache/hadoop/ozone/om/OzoneManagerLock.java   | 269 ---------------------
 .../hadoop/ozone/om/S3SecretManagerImpl.java       |   9 +-
 .../hadoop/ozone/om/lock/OzoneManagerLock.java     |  54 +++--
 .../hadoop/ozone/om/lock/OzoneManagerLockUtil.java |  10 +-
 .../hadoop/ozone/om/lock/TestOzoneManagerLock.java |  25 +-
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  |  47 ++--
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 100 ++++----
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   3 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  18 +-
 .../apache/hadoop/ozone/om/PrefixManagerImpl.java  |  21 +-
 .../apache/hadoop/ozone/om/S3BucketManager.java    |   6 +
 .../hadoop/ozone/om/S3BucketManagerImpl.java       |  21 +-
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  |  88 ++++---
 .../om/request/bucket/OMBucketCreateRequest.java   |  17 +-
 .../om/request/bucket/OMBucketDeleteRequest.java   |   8 +-
 .../request/bucket/OMBucketSetPropertyRequest.java |   8 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  10 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |   8 +-
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |   7 +-
 .../ozone/om/request/key/OMKeyRenameRequest.java   |   7 +-
 .../om/request/volume/OMVolumeCreateRequest.java   |  18 +-
 .../om/request/volume/OMVolumeDeleteRequest.java   |  44 ++--
 .../om/request/volume/OMVolumeSetOwnerRequest.java |  29 +--
 .../om/request/volume/OMVolumeSetQuotaRequest.java |   6 +-
 .../hadoop/ozone/om/TestOzoneManagerLock.java      | 193 ---------------
 26 files changed, 324 insertions(+), 703 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 34d81cf..dbac5f3 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
+import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.utils.db.DBStore;
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java
deleted file mode 100644
index c569c09..0000000
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.om;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.lock.LockManager;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_S3_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX;
-
-/**
- * Provides different locks to handle concurrency in OzoneMaster.
- * We also maintain lock hierarchy, based on the weight.
- *
- * <table>
- *   <caption></caption>
- *   <tr>
- *     <td><b> WEIGHT </b></td> <td><b> LOCK </b></td>
- *   </tr>
- *   <tr>
- *     <td> 0 </td> <td> User Lock </td>
- *   </tr>
- *   <tr>
- *     <td> 1 </td> <td> Volume Lock </td>
- *   </tr>
- *   <tr>
- *     <td> 2 </td> <td> Bucket Lock </td>
- *   </tr>
- *   <tr>
- *     <td> 3 </td> <td> Prefix Lock </td>
- *    </tr>
- * </table>
- *
- * One cannot obtain a lower weight lock while holding a lock with higher
- * weight. The other way around is possible. <br>
- * <br>
- * <p>
- * For example:
- * <br>
- * {@literal ->} acquireVolumeLock (will work)<br>
- *   {@literal +->} acquireBucketLock (will work)<br>
- *     {@literal +-->} acquireUserLock (will throw Exception)<br>
- * </p>
- * <br>
- * To acquire a user lock you should not hold any Volume/Bucket lock. Similarly
- * to acquire a Volume lock you should not hold any Bucket lock.
- */
-public final class OzoneManagerLock {
-
-  private static final String VOLUME_LOCK = "volumeLock";
-  private static final String BUCKET_LOCK = "bucketLock";
-  private static final String PREFIX_LOCK = "prefixLock";
-  private static final String S3_BUCKET_LOCK = "s3BucketLock";
-  private static final String S3_SECRET_LOCK = "s3SecretetLock";
-
-  private final LockManager<String> manager;
-
-  // To maintain locks held by current thread.
-  private final ThreadLocal<Map<String, AtomicInteger>> myLocks =
-      ThreadLocal.withInitial(
-          () -> ImmutableMap.of(
-              VOLUME_LOCK, new AtomicInteger(0),
-              BUCKET_LOCK, new AtomicInteger(0),
-              PREFIX_LOCK, new AtomicInteger(0),
-              S3_BUCKET_LOCK, new AtomicInteger(0),
-              S3_SECRET_LOCK, new AtomicInteger(0)
-          )
-      );
-
-  /**
-   * Creates new OzoneManagerLock instance.
-   * @param conf Configuration object
-   */
-  public OzoneManagerLock(Configuration conf) {
-    manager = new LockManager<>(conf);
-  }
-
-  /**
-   * Acquires user lock on the given resource.
-   *
-   * <p>If the lock is not available then the current thread becomes
-   * disabled for thread scheduling purposes and lies dormant until the
-   * lock has been acquired.
-   *
-   * @param user User on which the lock has to be acquired
-   */
-  public void acquireUserLock(String user) {
-    // Calling thread should not hold any volume or bucket lock.
-    if (hasAnyVolumeLock() || hasAnyBucketLock() || hasAnyS3Lock()) {
-      throw new RuntimeException(
-          "Thread '" + Thread.currentThread().getName() +
-              "' cannot acquire user lock" +
-              " while holding volume, bucket or S3 bucket lock(s).");
-    }
-    manager.lock(OM_USER_PREFIX + user);
-  }
-
-  /**
-   * Releases the user lock on given resource.
-   */
-  public void releaseUserLock(String user) {
-    manager.unlock(OM_USER_PREFIX + user);
-  }
-
-  /**
-   * Acquires volume lock on the given resource.
-   *
-   * <p>If the lock is not available then the current thread becomes
-   * disabled for thread scheduling purposes and lies dormant until the
-   * lock has been acquired.
-   *
-   * @param volume Volume on which the lock has to be acquired
-   */
-  public void acquireVolumeLock(String volume) {
-    // Calling thread should not hold any bucket lock.
-    // You can take an Volume while holding S3 bucket lock, since
-    // semantically an S3 bucket maps to the ozone volume. So we check here
-    // only if ozone bucket lock is taken.
-    if (hasAnyBucketLock()) {
-      throw new RuntimeException(
-          "Thread '" + Thread.currentThread().getName() +
-              "' cannot acquire volume lock while holding bucket lock(s).");
-    }
-    manager.lock(OM_KEY_PREFIX + volume);
-    myLocks.get().get(VOLUME_LOCK).incrementAndGet();
-  }
-
-  /**
-   * Releases the volume lock on given resource.
-   */
-  public void releaseVolumeLock(String volume) {
-    manager.unlock(OM_KEY_PREFIX + volume);
-    myLocks.get().get(VOLUME_LOCK).decrementAndGet();
-  }
-
-  /**
-   * Acquires S3 Bucket lock on the given resource.
-   *
-   * <p>If the lock is not available then the current thread becomes
-   * disabled for thread scheduling purposes and lies dormant until the lock has
-   * been acquired.
-   *
-   * @param s3BucketName S3Bucket Name on which the lock has to be acquired
-   */
-  public void acquireS3Lock(String s3BucketName) {
-    // Calling thread should not hold any bucket lock.
-    // You can take an Volume while holding S3 bucket lock, since
-    // semantically an S3 bucket maps to the ozone volume. So we check here
-    // only if ozone bucket lock is taken.
-    if (hasAnyBucketLock()) {
-      throw new RuntimeException(
-          "Thread '" + Thread.currentThread().getName() +
-              "' cannot acquire S3 bucket lock while holding Ozone bucket " +
-              "lock(s).");
-    }
-    manager.lock(OM_S3_PREFIX + s3BucketName);
-    myLocks.get().get(S3_BUCKET_LOCK).incrementAndGet();
-  }
-
-  /**
-   * Releases the volume lock on given resource.
-   */
-  public void releaseS3Lock(String s3BucketName) {
-    manager.unlock(OM_S3_PREFIX + s3BucketName);
-    myLocks.get().get(S3_BUCKET_LOCK).decrementAndGet();
-  }
-
-  /**
-   * Acquires bucket lock on the given resource.
-   *
-   * <p>If the lock is not available then the current thread becomes
-   * disabled for thread scheduling purposes and lies dormant until the
-   * lock has been acquired.
-   *
-   * @param bucket Bucket on which the lock has to be acquired
-   */
-  public void acquireBucketLock(String volume, String bucket) {
-    manager.lock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket);
-    myLocks.get().get(BUCKET_LOCK).incrementAndGet();
-  }
-
-  /**
-   * Releases the bucket lock on given resource.
-   */
-  public void releaseBucketLock(String volume, String bucket) {
-    manager.unlock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket);
-    myLocks.get().get(BUCKET_LOCK).decrementAndGet();
-  }
-
-  /**
-   * Returns true if the current thread holds any volume lock.
-   * @return true if current thread holds volume lock, else false
-   */
-  private boolean hasAnyVolumeLock() {
-    return myLocks.get().get(VOLUME_LOCK).get() != 0;
-  }
-
-  /**
-   * Returns true if the current thread holds any bucket lock.
-   * @return true if current thread holds bucket lock, else false
-   */
-  private boolean hasAnyBucketLock() {
-    return myLocks.get().get(BUCKET_LOCK).get() != 0;
-  }
-
-  private boolean hasAnyS3Lock() {
-    return myLocks.get().get(S3_BUCKET_LOCK).get() != 0;
-  }
-
-  public void acquireS3SecretLock(String awsAccessId) {
-    if (hasAnyS3SecretLock()) {
-      throw new RuntimeException(
-          "Thread '" + Thread.currentThread().getName() +
-              "' cannot acquire S3 Secret lock while holding S3 " +
-              "awsAccessKey lock(s).");
-    }
-    manager.lock(awsAccessId);
-    myLocks.get().get(S3_SECRET_LOCK).incrementAndGet();
-  }
-
-  private boolean hasAnyS3SecretLock() {
-    return myLocks.get().get(S3_SECRET_LOCK).get() != 0;
-  }
-
-  public void releaseS3SecretLock(String awsAccessId) {
-    manager.unlock(awsAccessId);
-    myLocks.get().get(S3_SECRET_LOCK).decrementAndGet();
-  }
-
-  public void acquirePrefixLock(String prefixPath) {
-    if (hasAnyPrefixLock()) {
-      throw new RuntimeException(
-          "Thread '" + Thread.currentThread().getName() +
-              "' cannot acquire prefix path lock while holding prefix " +
-              "path lock(s) for path: " + prefixPath + ".");
-    }
-    manager.lock(prefixPath);
-    myLocks.get().get(PREFIX_LOCK).incrementAndGet();
-  }
-
-  private boolean hasAnyPrefixLock() {
-    return myLocks.get().get(PREFIX_LOCK).get() != 0;
-  }
-
-  public void releasePrefixLock(String prefixPath) {
-    manager.unlock(prefixPath);
-    myLocks.get().get(PREFIX_LOCK).decrementAndGet();
-  }
-}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
index c76a757..2fdf543 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
 import static org.apache.hadoop.ozone.security.OzoneSecurityException.ResultCodes.S3_SECRET_NOT_FOUND;
 
 /**
@@ -60,7 +61,7 @@ public class S3SecretManagerImpl implements S3SecretManager {
     Preconditions.checkArgument(Strings.isNotBlank(kerberosID),
         "kerberosID cannot be null or empty.");
     S3SecretValue result = null;
-    omMetadataManager.getLock().acquireS3SecretLock(kerberosID);
+    omMetadataManager.getLock().acquireLock(S3_SECRET_LOCK, kerberosID);
     try {
       S3SecretValue s3Secret =
           omMetadataManager.getS3SecretTable().get(kerberosID);
@@ -72,7 +73,7 @@ public class S3SecretManagerImpl implements S3SecretManager {
         return s3Secret;
       }
     } finally {
-      omMetadataManager.getLock().releaseS3SecretLock(kerberosID);
+      omMetadataManager.getLock().releaseLock(S3_SECRET_LOCK, kerberosID);
     }
     LOG.trace("Secret for accessKey:{}, proto:{}", kerberosID, result);
     return result;
@@ -86,7 +87,7 @@ public class S3SecretManagerImpl implements S3SecretManager {
     LOG.trace("Get secret for awsAccessKey:{}", kerberosID);
 
     S3SecretValue s3Secret;
-    omMetadataManager.getLock().acquireS3SecretLock(kerberosID);
+    omMetadataManager.getLock().acquireLock(S3_SECRET_LOCK, kerberosID);
     try {
       s3Secret = omMetadataManager.getS3SecretTable().get(kerberosID);
       if (s3Secret == null) {
@@ -94,7 +95,7 @@ public class S3SecretManagerImpl implements S3SecretManager {
             "awsAccessKeyId " + kerberosID, S3_SECRET_NOT_FOUND);
       }
     } finally {
-      omMetadataManager.getLock().releaseS3SecretLock(kerberosID);
+      omMetadataManager.getLock().releaseLock(S3_SECRET_LOCK, kerberosID);
     }
 
     return s3Secret.getAwsSecret();
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 4267577..957437f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -91,21 +91,21 @@ public class OzoneManagerLock {
   /**
    * Acquire lock on resource.
    *
-   * For S3_Bucket, VOLUME, BUCKET type resource, same thread acquiring lock
-   * again is allowed.
+   * For S3_BUCKET_LOCK, VOLUME_LOCK, BUCKET_LOCK type resource, same
+   * thread acquiring lock again is allowed.
    *
-   * For USER, PREFIX, S3_SECRET type resource, same thread acquiring lock
-   * again is not allowed.
+   * For USER_LOCK, PREFIX_LOCK, S3_SECRET_LOCK type resource, same thread
+   * acquiring lock again is not allowed.
    *
-   * Special Note for UserLock: Single thread can acquire single user lock/
+   * Special Note for USER_LOCK: Single thread can acquire single user lock/
    * multi user lock. But not both at the same time.
    * @param resource - Type of the resource.
    * @param resources - Resource names on which user want to acquire lock.
-   * For Resource type bucket, first param should be volume, second param
+   * For Resource type BUCKET_LOCK, first param should be volume, second param
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
-  public void acquireLock(Resource resource, String... resources) {
+  public boolean acquireLock(Resource resource, String... resources) {
     String resourceName = generateResourceName(resource, resources);
     if (!resource.canLock(lockSet.get())) {
       String errorMessage = getErrorMessage(resource);
@@ -116,6 +116,7 @@ public class OzoneManagerLock {
       LOG.debug("Acquired {} lock on resource {}", resource.name,
           resourceName);
       lockSet.set(resource.setLock(lockSet.get()));
+      return true;
     }
   }
 
@@ -125,10 +126,10 @@ public class OzoneManagerLock {
    * @param resources
    */
   private String generateResourceName(Resource resource, String... resources) {
-    if (resources.length == 1 && resource != Resource.BUCKET) {
+    if (resources.length == 1 && resource != Resource.BUCKET_LOCK) {
       return OzoneManagerLockUtil.generateResourceLockName(resource,
           resources[0]);
-    } else if (resources.length == 2 && resource == Resource.BUCKET) {
+    } else if (resources.length == 2 && resource == Resource.BUCKET_LOCK) {
       return OzoneManagerLockUtil.generateBucketLockName(resources[0],
           resources[1]);
     } else {
@@ -160,8 +161,8 @@ public class OzoneManagerLock {
    * @param firstUser
    * @param secondUser
    */
-  public void acquireMultiUserLock(String firstUser, String secondUser) {
-    Resource resource = Resource.USER;
+  public boolean acquireMultiUserLock(String firstUser, String secondUser) {
+    Resource resource = Resource.USER_LOCK;
     firstUser = generateResourceName(resource, firstUser);
     secondUser = generateResourceName(resource, secondUser);
 
@@ -211,6 +212,7 @@ public class OzoneManagerLock {
       LOG.debug("Acquired {} lock on resource {} and {}", resource.name,
           firstUser, secondUser);
       lockSet.set(resource.setLock(lockSet.get()));
+      return true;
     }
   }
 
@@ -222,7 +224,7 @@ public class OzoneManagerLock {
    * @param secondUser
    */
   public void releaseMultiUserLock(String firstUser, String secondUser) {
-    Resource resource = Resource.USER;
+    Resource resource = Resource.USER_LOCK;
     firstUser = generateResourceName(resource, firstUser);
     secondUser = generateResourceName(resource, secondUser);
 
@@ -252,7 +254,7 @@ public class OzoneManagerLock {
    * Release lock on resource.
    * @param resource - Type of the resource.
    * @param resources - Resource names on which user want to acquire lock.
-   * For Resource type bucket, first param should be volume, second param
+   * For Resource type BUCKET_LOCK, first param should be volume, second param
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
@@ -274,21 +276,21 @@ public class OzoneManagerLock {
    */
   public enum Resource {
     // For S3 Bucket need to allow only for S3, that should be means only 1.
-    S3_BUCKET((byte) 0, "S3_BUCKET"), // = 1
+    S3_BUCKET_LOCK((byte) 0, "S3_BUCKET_LOCK"), // = 1
 
     // For volume need to allow both s3 bucket and volume. 01 + 10 = 11 (3)
-    VOLUME((byte) 1, "VOLUME"), // = 2
+    VOLUME_LOCK((byte) 1, "VOLUME_LOCK"), // = 2
 
     // For bucket we need to allow both s3 bucket, volume and bucket. Which
     // is equal to 100 + 010 + 001 = 111 = 4 + 2 + 1 = 7
-    BUCKET((byte) 2, "BUCKET"), // = 4
+    BUCKET_LOCK((byte) 2, "BUCKET_LOCK"), // = 4
 
     // For user we need to allow s3 bucket, volume, bucket and user lock.
     // Which is 8  4 + 2 + 1 = 15
-    USER((byte) 3, "USER"), // 15
+    USER_LOCK((byte) 3, "USER_LOCK"), // 15
 
-    S3_SECRET((byte) 4, "S3_SECRET"), // 31
-    PREFIX((byte) 5, "PREFIX"); //63
+    S3_SECRET_LOCK((byte) 4, "S3_SECRET_LOCK"), // 31
+    PREFIX_LOCK((byte) 5, "PREFIX_LOCK"); //63
 
     // level of the resource
     private byte lockLevel;
@@ -312,13 +314,13 @@ public class OzoneManagerLock {
 
     boolean canLock(short lockSetVal) {
 
-      // For USER, S3_SECRET and  PREFIX we shall not allow re-acquire locks at
-      // from single thread. 2nd condition is we have acquired one of these
-      // locks, but after that trying to acquire a lock with less than equal of
-      // lockLevel, we should disallow.
-      if (((USER.setMask & lockSetVal) == USER.setMask ||
-          (S3_SECRET.setMask & lockSetVal) == S3_SECRET.setMask ||
-          (PREFIX.setMask & lockSetVal) == PREFIX.setMask)
+      // For USER_LOCK, S3_SECRET_LOCK and  PREFIX_LOCK we shall not allow
+      // re-acquire locks from single thread. 2nd condition is we have
+      // acquired one of these locks, but after that trying to acquire a lock
+      // with less than equal of lockLevel, we should disallow.
+      if (((USER_LOCK.setMask & lockSetVal) == USER_LOCK.setMask ||
+          (S3_SECRET_LOCK.setMask & lockSetVal) == S3_SECRET_LOCK.setMask ||
+          (PREFIX_LOCK.setMask & lockSetVal) == PREFIX_LOCK.setMask)
           && setMask <= lockSetVal) {
         return false;
       }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLockUtil.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLockUtil.java
index f26256c..78a42aa 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLockUtil.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLockUtil.java
@@ -42,15 +42,15 @@ final class OzoneManagerLockUtil {
   public static String generateResourceLockName(
       OzoneManagerLock.Resource resource, String resourceName) {
 
-    if (resource == OzoneManagerLock.Resource.S3_BUCKET) {
+    if (resource == OzoneManagerLock.Resource.S3_BUCKET_LOCK) {
       return OM_S3_PREFIX + resourceName;
-    } else if (resource == OzoneManagerLock.Resource.VOLUME) {
+    } else if (resource == OzoneManagerLock.Resource.VOLUME_LOCK) {
       return OM_KEY_PREFIX + resourceName;
-    } else if (resource == OzoneManagerLock.Resource.USER) {
+    } else if (resource == OzoneManagerLock.Resource.USER_LOCK) {
       return OM_USER_PREFIX + resourceName;
-    } else if (resource == OzoneManagerLock.Resource.S3_SECRET) {
+    } else if (resource == OzoneManagerLock.Resource.S3_SECRET_LOCK) {
       return OM_S3_SECRET + resourceName;
-    } else if (resource == OzoneManagerLock.Resource.PREFIX) {
+    } else if (resource == OzoneManagerLock.Resource.PREFIX_LOCK) {
       return OM_PREFIX + resourceName;
     } else {
       // This is for developers who mistakenly call this method with resource
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
index bdc3d19..8438cbf 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
@@ -70,9 +70,9 @@ public class TestOzoneManagerLock {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
 
     // Lock re-acquire not allowed by same thread.
-    if (resource == OzoneManagerLock.Resource.USER ||
-        resource == OzoneManagerLock.Resource.S3_SECRET ||
-        resource == OzoneManagerLock.Resource.PREFIX){
+    if (resource == OzoneManagerLock.Resource.USER_LOCK ||
+        resource == OzoneManagerLock.Resource.S3_SECRET_LOCK ||
+        resource == OzoneManagerLock.Resource.PREFIX_LOCK){
       lock.acquireLock(resource, resourceName);
       try {
         lock.acquireLock(resource, resourceName);
@@ -198,7 +198,7 @@ public class TestOzoneManagerLock {
     OzoneManagerLock lock =
         new OzoneManagerLock(new OzoneConfiguration());
     try {
-      lock.releaseLock(OzoneManagerLock.Resource.USER, "user3");
+      lock.releaseLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
       fail("releaseLockWithOutAcquiringLock failed");
     } catch (IllegalMonitorStateException ex) {
       String message = "Releasing lock on resource $user3 without acquiring " +
@@ -209,7 +209,7 @@ public class TestOzoneManagerLock {
 
 
   private String[] generateResourceName(OzoneManagerLock.Resource resource) {
-    if (resource.getName() == OzoneManagerLock.Resource.BUCKET.getName()) {
+    if (resource == OzoneManagerLock.Resource.BUCKET_LOCK) {
       return new String[]{UUID.randomUUID().toString(),
           UUID.randomUUID().toString()};
     } else {
@@ -255,7 +255,8 @@ public class TestOzoneManagerLock {
       lock.acquireMultiUserLock("user1", "user2");
       fail("reAcquireMultiUserLock failed");
     } catch (RuntimeException ex) {
-      String message = "cannot acquire USER lock while holding [USER] lock(s).";
+      String message = "cannot acquire USER_LOCK lock while holding " +
+          "[USER_LOCK] lock(s).";
       Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(message));
     }
     lock.releaseMultiUserLock("user1", "user2");
@@ -264,15 +265,16 @@ public class TestOzoneManagerLock {
   @Test
   public void acquireMultiUserLockAfterUserLock() {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireLock(OzoneManagerLock.Resource.USER, "user3");
+    lock.acquireLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
     try {
       lock.acquireMultiUserLock("user1", "user2");
       fail("acquireMultiUserLockAfterUserLock failed");
     } catch (RuntimeException ex) {
-      String message = "cannot acquire USER lock while holding [USER] lock(s).";
+      String message = "cannot acquire USER_LOCK lock while holding " +
+          "[USER_LOCK] lock(s).";
       Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(message));
     }
-    lock.releaseLock(OzoneManagerLock.Resource.USER, "user3");
+    lock.releaseLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
   }
 
   @Test
@@ -280,10 +282,11 @@ public class TestOzoneManagerLock {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
     lock.acquireMultiUserLock("user1", "user2");
     try {
-      lock.acquireLock(OzoneManagerLock.Resource.USER, "user3");
+      lock.acquireLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
       fail("acquireUserLockAfterMultiUserLock failed");
     } catch (RuntimeException ex) {
-      String message = "cannot acquire USER lock while holding [USER] lock(s).";
+      String message = "cannot acquire USER_LOCK lock while holding " +
+          "[USER_LOCK] lock(s).";
       Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(message));
     }
     lock.releaseMultiUserLock("user1", "user2");
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
index ea8f5f0..38c576a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@ -44,7 +44,8 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
-
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 /**
  * OM bucket manager.
  */
@@ -114,9 +115,11 @@ public class BucketManagerImpl implements BucketManager {
     Preconditions.checkNotNull(bucketInfo);
     String volumeName = bucketInfo.getVolumeName();
     String bucketName = bucketInfo.getBucketName();
-    metadataManager.getLock().acquireVolumeLock(volumeName);
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    boolean acquiredBucketLock = false;
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volumeName);
     try {
+      acquiredBucketLock = metadataManager.getLock().acquireLock(BUCKET_LOCK,
+          volumeName, bucketName);
       String volumeKey = metadataManager.getVolumeKey(volumeName);
       String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
 
@@ -182,8 +185,11 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
-      metadataManager.getLock().releaseVolumeLock(volumeName);
+      if (acquiredBucketLock) {
+        metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volumeName);
     }
   }
 
@@ -207,7 +213,7 @@ public class BucketManagerImpl implements BucketManager {
       throws IOException {
     Preconditions.checkNotNull(volumeName);
     Preconditions.checkNotNull(bucketName);
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
       OmBucketInfo value = metadataManager.getBucketTable().get(bucketKey);
@@ -225,7 +231,8 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -240,7 +247,7 @@ public class BucketManagerImpl implements BucketManager {
     Preconditions.checkNotNull(args);
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
       OmBucketInfo oldBucketInfo =
@@ -297,7 +304,8 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -334,7 +342,7 @@ public class BucketManagerImpl implements BucketManager {
       throws IOException {
     Preconditions.checkNotNull(volumeName);
     Preconditions.checkNotNull(bucketName);
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       //Check if bucket exists
       String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
@@ -357,7 +365,8 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -397,7 +406,7 @@ public class BucketManagerImpl implements BucketManager {
     }
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
-    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
       OmBucketInfo bucketInfo =
@@ -452,7 +461,7 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volume, bucket);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
 
     return true;
@@ -476,7 +485,7 @@ public class BucketManagerImpl implements BucketManager {
     }
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
-    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
       OmBucketInfo bucketInfo =
@@ -518,7 +527,7 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volume, bucket);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
 
     return true;
@@ -542,7 +551,7 @@ public class BucketManagerImpl implements BucketManager {
     }
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
-    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
       OmBucketInfo bucketInfo =
@@ -571,7 +580,7 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volume, bucket);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
 
     return true;
@@ -593,7 +602,7 @@ public class BucketManagerImpl implements BucketManager {
     }
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
-    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
       OmBucketInfo bucketInfo =
@@ -611,7 +620,7 @@ public class BucketManagerImpl implements BucketManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volume, bucket);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index a29212c..75d1af5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -118,6 +118,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
@@ -446,9 +447,8 @@ public class KeyManagerImpl implements KeyManager {
         args.getVolumeName(), args.getBucketName(), args.getKeyName());
 
     FileEncryptionInfo encInfo;
-
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
-      metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
       OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
       encInfo = getFileEncryptionInfo(bucketInfo);
       keyInfo = prepareKeyInfo(args, dbKeyName, size, locations, encInfo);
@@ -459,7 +459,8 @@ public class KeyManagerImpl implements KeyManager {
           volumeName, bucketName, keyName, ex);
       throw new OMException(ex.getMessage(), ResultCodes.KEY_ALLOCATION_ERROR);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
     if (keyInfo == null) {
       // the key does not exist, create a new object, the new blocks are the
@@ -558,7 +559,7 @@ public class KeyManagerImpl implements KeyManager {
     // check?
     validateBucket(volumeName, bucketName);
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     String keyName = omKeyArgs.getKeyName();
 
     // TODO: here if on OM machines clocks are skewed and there is a chance
@@ -577,7 +578,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           ResultCodes.KEY_ALLOCATION_ERROR);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -628,7 +630,8 @@ public class KeyManagerImpl implements KeyManager {
         .getOpenKey(volumeName, bucketName, keyName, clientID);
     Preconditions.checkNotNull(locationInfoList);
     try {
-      metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+      metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+          bucketName);
       validateBucket(volumeName, bucketName);
       OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
       if (keyInfo == null) {
@@ -654,7 +657,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           ResultCodes.KEY_ALLOCATION_ERROR);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -664,7 +668,7 @@ public class KeyManagerImpl implements KeyManager {
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       String keyBytes = metadataManager.getOzoneKey(
           volumeName, bucketName, keyName);
@@ -720,7 +724,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           KEY_NOT_FOUND);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -738,7 +743,7 @@ public class KeyManagerImpl implements KeyManager {
           ResultCodes.INVALID_KEY_NAME);
     }
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       // fromKeyName should exist
       String fromKey = metadataManager.getOzoneKey(
@@ -791,7 +796,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           ResultCodes.KEY_RENAME_ERROR);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -801,7 +807,7 @@ public class KeyManagerImpl implements KeyManager {
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       String objectKey = metadataManager.getOzoneKey(
           volumeName, bucketName, keyName);
@@ -829,7 +835,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(), ex,
           ResultCodes.KEY_DELETION_ERROR);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -902,7 +909,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucketName = keyArgs.getBucketName();
     String keyName = keyArgs.getKeyName();
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     validateS3Bucket(volumeName, bucketName);
     try {
 
@@ -961,7 +968,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           ResultCodes.INITIATE_MULTIPART_UPLOAD_ERROR);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -975,7 +983,7 @@ public class KeyManagerImpl implements KeyManager {
     String uploadID = omKeyArgs.getMultipartUploadID();
     int partNumber = omKeyArgs.getMultipartUploadPartNumber();
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     validateS3Bucket(volumeName, bucketName);
     String partName;
     try {
@@ -1047,7 +1055,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(),
           ResultCodes.MULTIPART_UPLOAD_PARTFILE_ERROR);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
     return new OmMultipartCommitUploadPartInfo(partName);
@@ -1055,6 +1064,7 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
+  @SuppressWarnings("methodlength")
   public OmMultipartUploadCompleteInfo completeMultipartUpload(
       OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
       throws IOException {
@@ -1064,7 +1074,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucketName = omKeyArgs.getBucketName();
     String keyName = omKeyArgs.getKeyName();
     String uploadID = omKeyArgs.getMultipartUploadID();
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     validateS3Bucket(volumeName, bucketName);
     try {
       String multipartKey = metadataManager.getMultipartKey(volumeName,
@@ -1204,7 +1214,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(), ResultCodes
           .COMPLETE_MULTIPART_UPLOAD_ERROR);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -1218,7 +1229,7 @@ public class KeyManagerImpl implements KeyManager {
     String uploadID = omKeyArgs.getMultipartUploadID();
     Preconditions.checkNotNull(uploadID, "uploadID cannot be null");
     validateS3Bucket(volumeName, bucketName);
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
 
     try {
       String multipartKey = metadataManager.getMultipartKey(volumeName,
@@ -1268,7 +1279,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(), ResultCodes
           .ABORT_MULTIPART_UPLOAD_FAILED);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
   }
@@ -1285,7 +1297,7 @@ public class KeyManagerImpl implements KeyManager {
     boolean isTruncated = false;
     int nextPartNumberMarker = 0;
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       String multipartKey = metadataManager.getMultipartKey(volumeName,
           bucketName, keyName, uploadID);
@@ -1346,7 +1358,8 @@ public class KeyManagerImpl implements KeyManager {
       throw new OMException(ex.getMessage(), ResultCodes
               .LIST_MULTIPART_UPLOAD_PARTS_FAILED);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -1365,7 +1378,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucket = obj.getBucketName();
     String keyName = obj.getKeyName();
 
-    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       validateBucket(volume, bucket);
       String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
@@ -1425,7 +1438,7 @@ public class KeyManagerImpl implements KeyManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volume, bucket);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
     return true;
   }
@@ -1445,7 +1458,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucket = obj.getBucketName();
     String keyName = obj.getKeyName();
 
-    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       validateBucket(volume, bucket);
       String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
@@ -1513,7 +1526,7 @@ public class KeyManagerImpl implements KeyManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volume, bucket);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
     return true;
   }
@@ -1533,7 +1546,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucket = obj.getBucketName();
     String keyName = obj.getKeyName();
 
-    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       validateBucket(volume, bucket);
       String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
@@ -1576,7 +1589,7 @@ public class KeyManagerImpl implements KeyManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volume, bucket);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
     return true;
   }
@@ -1594,7 +1607,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucket = obj.getBucketName();
     String keyName = obj.getKeyName();
 
-    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       validateBucket(volume, bucket);
       String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
@@ -1619,7 +1632,7 @@ public class KeyManagerImpl implements KeyManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volume, bucket);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
   }
 
@@ -1664,7 +1677,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       // Check if this is the root of the filesystem.
       if (keyName.length() == 0) {
@@ -1702,7 +1715,8 @@ public class KeyManagerImpl implements KeyManager {
           volumeName + " bucket: " + bucketName + " key: " + keyName,
           ResultCodes.FILE_NOT_FOUND);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -1722,7 +1736,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
 
       // Check if this is the root of the filesystem.
@@ -1744,7 +1758,8 @@ public class KeyManagerImpl implements KeyManager {
           .getOzoneKey(volumeName, bucketName, dirDbKeyInfo.getKeyName());
       metadataManager.getKeyTable().put(dirDbKey, dirDbKeyInfo);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
   }
 
@@ -1796,7 +1811,7 @@ public class KeyManagerImpl implements KeyManager {
     String keyName = args.getKeyName();
     OpenKeySession keySession;
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       OzoneFileStatus fileStatus;
       try {
@@ -1822,7 +1837,8 @@ public class KeyManagerImpl implements KeyManager {
       // filestatus. We can avoid some operations in openKey call.
       keySession = openKey(args);
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
     return keySession;
@@ -1844,7 +1860,7 @@ public class KeyManagerImpl implements KeyManager {
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       OzoneFileStatus fileStatus = getFileStatus(args);
       if (fileStatus.isFile()) {
@@ -1852,7 +1868,8 @@ public class KeyManagerImpl implements KeyManager {
       }
       //if key is not of type file or if key is not found we throw an exception
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
     throw new OMException("Can not write to directory: " + keyName,
@@ -1878,7 +1895,7 @@ public class KeyManagerImpl implements KeyManager {
     String keyName = args.getKeyName();
 
     List<OzoneFileStatus> fileStatusList = new ArrayList<>();
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
     try {
       if (Strings.isNullOrEmpty(startKey)) {
         OzoneFileStatus fileStatus = getFileStatus(args);
@@ -1940,7 +1957,8 @@ public class KeyManagerImpl implements KeyManager {
         }
       }
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
     return fileStatusList;
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index d351ed9..0741777 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
+import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.utils.db.DBStore;
@@ -403,7 +404,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    * @return OzoneManagerLock
    */
   @Override
-  public OzoneManagerLock getLock() {
+  public org.apache.hadoop.ozone.om.lock.OzoneManagerLock getLock() {
     return lock;
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index e472deb..8032b6d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -210,6 +210,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_D
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 import static org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneManagerService
     .newReflectiveBlockingService;
@@ -2631,13 +2633,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   public void createS3Bucket(String userName, String s3BucketName)
       throws IOException {
+
+    boolean acquiredS3Lock = false;
+    boolean acquiredVolumeLock = false;
     try {
       if(isAclEnabled) {
         checkAcls(ResourceType.BUCKET, StoreType.S3, ACLType.CREATE,
             null, s3BucketName, null);
       }
       metrics.incNumBucketCreates();
+      acquiredS3Lock = metadataManager.getLock().acquireLock(S3_BUCKET_LOCK,
+          s3BucketName);
       try {
+        acquiredVolumeLock = metadataManager.getLock().acquireLock(VOLUME_LOCK,
+            s3BucketManager.formatOzoneVolumeName(userName));
         boolean newVolumeCreate = s3BucketManager.createOzoneVolumeIfNeeded(
             userName);
         if (newVolumeCreate) {
@@ -2652,12 +2661,19 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         metrics.incNumVolumeCreateFails();
         throw ex;
       }
-
       s3BucketManager.createS3Bucket(userName, s3BucketName);
       metrics.incNumBuckets();
     } catch (IOException ex) {
       metrics.incNumBucketCreateFails();
       throw ex;
+    } finally {
+      if (acquiredVolumeLock) {
+        metadataManager.getLock().releaseLock(VOLUME_LOCK,
+            s3BucketManager.formatOzoneVolumeName(userName));
+      }
+      if (acquiredS3Lock) {
+        metadataManager.getLock().releaseLock(S3_BUCKET_LOCK, s3BucketName);
+      }
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
index b9aff89..ff71a88 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
@@ -37,6 +37,7 @@ import java.util.stream.Collectors;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PREFIX_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.PREFIX_LOCK;
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.PREFIX;
 
 /**
@@ -91,7 +92,7 @@ public class PrefixManagerImpl implements PrefixManager {
     validateOzoneObj(obj);
 
     String prefixPath = obj.getPath();
-    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
     try {
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
@@ -135,7 +136,7 @@ public class PrefixManagerImpl implements PrefixManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releasePrefixLock(prefixPath);
+      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
     }
     return true;
   }
@@ -152,7 +153,7 @@ public class PrefixManagerImpl implements PrefixManager {
   public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
     validateOzoneObj(obj);
     String prefixPath = obj.getPath();
-    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
     try {
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
@@ -205,7 +206,7 @@ public class PrefixManagerImpl implements PrefixManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releasePrefixLock(prefixPath);
+      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
     }
     return true;
   }
@@ -222,7 +223,7 @@ public class PrefixManagerImpl implements PrefixManager {
   public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
     validateOzoneObj(obj);
     String prefixPath = obj.getPath();
-    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
     try {
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
@@ -241,7 +242,7 @@ public class PrefixManagerImpl implements PrefixManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releasePrefixLock(prefixPath);
+      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
     }
     return true;
   }
@@ -256,7 +257,7 @@ public class PrefixManagerImpl implements PrefixManager {
   public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
     validateOzoneObj(obj);
     String prefixPath = obj.getPath();
-    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
     try {
       String longestPrefix = prefixTree.getLongestPrefix(prefixPath);
       if (prefixPath.equals(longestPrefix)) {
@@ -267,7 +268,7 @@ public class PrefixManagerImpl implements PrefixManager {
         }
       }
     } finally {
-      metadataManager.getLock().releasePrefixLock(prefixPath);
+      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
     }
     return EMPTY_ACL_LIST;
   }
@@ -275,12 +276,12 @@ public class PrefixManagerImpl implements PrefixManager {
   @Override
   public List<OmPrefixInfo> getLongestPrefixPath(String path) {
     String prefixPath = prefixTree.getLongestPrefix(path);
-    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
     try {
       return prefixTree.getLongestPrefixPath(prefixPath).stream()
           .map(c -> c.getValue()).collect(Collectors.toList());
     } finally {
-      metadataManager.getLock().releasePrefixLock(prefixPath);
+      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManager.java
index 9e3523f..dfd0ac3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManager.java
@@ -78,4 +78,10 @@ public interface S3BucketManager {
    * @throws IOException - incase of volume creation failure.
    */
   boolean createOzoneVolumeIfNeeded(String userName) throws IOException;
+
+  /**
+   * Return volume name from userName.
+   * @param userName
+   */
+  String formatOzoneVolumeName(String userName);
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
index c234266..6fe08811 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_ALREADY_EXISTS;
+
 import org.apache.logging.log4j.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ import java.io.IOException;
 import java.util.Objects;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_S3_VOLUME_PREFIX;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_BUCKET_LOCK;
 
 /**
  * S3 Bucket Manager, this class maintains a mapping between S3 Bucket and Ozone
@@ -101,10 +103,9 @@ public class S3BucketManagerImpl implements S3BucketManager {
     // anonymous access to bucket where the user name is absent.
     String ozoneVolumeName = formatOzoneVolumeName(userName);
 
-    omMetadataManager.getLock().acquireS3Lock(bucketName);
+    omMetadataManager.getLock().acquireLock(S3_BUCKET_LOCK, bucketName);
     try {
-      String bucket =
-          omMetadataManager.getS3Table().get(bucketName);
+      String bucket = omMetadataManager.getS3Table().get(bucketName);
 
       if (bucket != null) {
         LOG.debug("Bucket already exists. {}", bucketName);
@@ -119,8 +120,9 @@ public class S3BucketManagerImpl implements S3BucketManager {
 
       omMetadataManager.getS3Table().put(bucketName, finalName);
     } finally {
-      omMetadataManager.getLock().releaseS3Lock(bucketName);
+      omMetadataManager.getLock().releaseLock(S3_BUCKET_LOCK, bucketName);
     }
+
   }
 
   @Override
@@ -128,7 +130,7 @@ public class S3BucketManagerImpl implements S3BucketManager {
     Preconditions.checkArgument(
         Strings.isNotBlank(bucketName), "Bucket name cannot be null or empty");
 
-    omMetadataManager.getLock().acquireS3Lock(bucketName);
+    omMetadataManager.getLock().acquireLock(S3_BUCKET_LOCK, bucketName);
     try {
       String map = omMetadataManager.getS3Table().get(bucketName);
 
@@ -142,12 +144,13 @@ public class S3BucketManagerImpl implements S3BucketManager {
     } catch(IOException ex) {
       throw ex;
     } finally {
-      omMetadataManager.getLock().releaseS3Lock(bucketName);
+      omMetadataManager.getLock().releaseLock(S3_BUCKET_LOCK, bucketName);
     }
 
   }
 
-  private String formatOzoneVolumeName(String userName) {
+  @Override
+  public String formatOzoneVolumeName(String userName) {
     return String.format(OM_S3_VOLUME_PREFIX + "%s", userName);
   }
 
@@ -207,7 +210,7 @@ public class S3BucketManagerImpl implements S3BucketManager {
     Preconditions.checkArgument(s3BucketName.length() >=3 &&
         s3BucketName.length() < 64,
         "Length of the S3 Bucket is not correct.");
-    omMetadataManager.getLock().acquireS3Lock(s3BucketName);
+    omMetadataManager.getLock().acquireLock(S3_BUCKET_LOCK, s3BucketName);
     try {
       String mapping = omMetadataManager.getS3Table().get(s3BucketName);
       if (mapping != null) {
@@ -216,7 +219,7 @@ public class S3BucketManagerImpl implements S3BucketManager {
       throw new OMException("No such S3 bucket.",
           OMException.ResultCodes.S3_BUCKET_NOT_FOUND);
     } finally {
-      omMetadataManager.getLock().releaseS3Lock(s3BucketName);
+      omMetadataManager.getLock().releaseLock(S3_BUCKET_LOCK, s3BucketName);
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index 6ff289a..04acd8b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@ -37,6 +37,8 @@ import com.google.common.base.Preconditions;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,9 +122,13 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public VolumeList createVolume(OmVolumeArgs omVolumeArgs) throws IOException {
     Preconditions.checkNotNull(omVolumeArgs);
-    metadataManager.getLock().acquireUserLock(omVolumeArgs.getOwnerName());
-    metadataManager.getLock().acquireVolumeLock(omVolumeArgs.getVolume());
+
+    boolean acquiredUserLock = false;
+    metadataManager.getLock().acquireLock(VOLUME_LOCK,
+        omVolumeArgs.getVolume());
     try {
+      acquiredUserLock = metadataManager.getLock().acquireLock(USER_LOCK,
+          omVolumeArgs.getOwnerName());
       String dbVolumeKey = metadataManager.getVolumeKey(
           omVolumeArgs.getVolume());
       String dbUserKey = metadataManager.getUserKey(
@@ -156,8 +162,12 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(omVolumeArgs.getVolume());
-      metadataManager.getLock().releaseUserLock(omVolumeArgs.getOwnerName());
+      if (acquiredUserLock) {
+        metadataManager.getLock().releaseLock(USER_LOCK,
+            omVolumeArgs.getOwnerName());
+      }
+      metadataManager.getLock().releaseLock(VOLUME_LOCK,
+          omVolumeArgs.getVolume());
     }
   }
 
@@ -206,8 +216,9 @@ public class VolumeManagerImpl implements VolumeManager {
       throws IOException {
     Preconditions.checkNotNull(volume);
     Preconditions.checkNotNull(owner);
-    metadataManager.getLock().acquireUserLock(owner);
-    metadataManager.getLock().acquireVolumeLock(volume);
+    boolean acquiredUsersLock = false;
+    String actualOwner = null;
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs = metadataManager
@@ -221,8 +232,11 @@ public class VolumeManagerImpl implements VolumeManager {
 
       Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
 
-      String originalOwner =
-          metadataManager.getUserKey(volumeArgs.getOwnerName());
+      actualOwner = volumeArgs.getOwnerName();
+      String originalOwner = metadataManager.getUserKey(actualOwner);
+
+      acquiredUsersLock = metadataManager.getLock().acquireMultiUserLock(owner,
+          originalOwner);
       VolumeList oldOwnerVolumeList = delVolumeFromOwnerList(volume,
           originalOwner);
 
@@ -243,8 +257,10 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
-      metadataManager.getLock().releaseUserLock(owner);
+      if (acquiredUsersLock) {
+        metadataManager.getLock().releaseMultiUserLock(owner, actualOwner);
+      }
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
   }
 
@@ -300,7 +316,7 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public OmVolumeArgs setQuota(String volume, long quota) throws IOException {
     Preconditions.checkNotNull(volume);
-    metadataManager.getLock().acquireVolumeLock(volume);
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -326,7 +342,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
   }
 
@@ -352,7 +368,7 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
     Preconditions.checkNotNull(volume);
-    metadataManager.getLock().acquireVolumeLock(volume);
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -370,7 +386,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
   }
 
@@ -385,16 +401,13 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public OmDeleteVolumeResponse deleteVolume(String volume) throws IOException {
     Preconditions.checkNotNull(volume);
-    String owner;
-    metadataManager.getLock().acquireVolumeLock(volume);
+    String owner = null;
+    boolean acquiredUserLock = false;
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       owner = getVolumeInfo(volume).getOwnerName();
-    } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
-    }
-    metadataManager.getLock().acquireUserLock(owner);
-    metadataManager.getLock().acquireVolumeLock(volume);
-    try {
+      acquiredUserLock = metadataManager.getLock().acquireLock(USER_LOCK,
+          owner);
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
           metadataManager.getVolumeTable().get(dbVolumeKey);
@@ -425,8 +438,11 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
-      metadataManager.getLock().releaseUserLock(owner);
+      if (acquiredUserLock) {
+        metadataManager.getLock().releaseLock(USER_LOCK, owner);
+      }
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
+
     }
   }
 
@@ -472,7 +488,7 @@ public class VolumeManagerImpl implements VolumeManager {
       throws IOException {
     Preconditions.checkNotNull(volume);
     Preconditions.checkNotNull(userAcl);
-    metadataManager.getLock().acquireVolumeLock(volume);
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -493,7 +509,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
   }
 
@@ -503,12 +519,12 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public List<OmVolumeArgs> listVolumes(String userName,
       String prefix, String startKey, int maxKeys) throws IOException {
-    metadataManager.getLock().acquireUserLock(userName);
+    metadataManager.getLock().acquireLock(USER_LOCK, userName);
     try {
       return metadataManager.listVolumes(
           userName, prefix, startKey, maxKeys);
     } finally {
-      metadataManager.getLock().releaseUserLock(userName);
+      metadataManager.getLock().releaseLock(USER_LOCK, userName);
     }
   }
 
@@ -529,7 +545,7 @@ public class VolumeManagerImpl implements VolumeManager {
           "VolumeManager. OzoneObj type:" + obj.getResourceType());
     }
     String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireVolumeLock(volume);
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -556,7 +572,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
 
     return true;
@@ -579,7 +595,7 @@ public class VolumeManagerImpl implements VolumeManager {
           "VolumeManager. OzoneObj type:" + obj.getResourceType());
     }
     String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireVolumeLock(volume);
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -606,7 +622,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
 
     return true;
@@ -630,7 +646,7 @@ public class VolumeManagerImpl implements VolumeManager {
           "VolumeManager. OzoneObj type:" + obj.getResourceType());
     }
     String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireVolumeLock(volume);
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -652,7 +668,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
 
     return true;
@@ -673,7 +689,7 @@ public class VolumeManagerImpl implements VolumeManager {
           "VolumeManager. OzoneObj type:" + obj.getResourceType());
     }
     String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireVolumeLock(volume);
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -692,7 +708,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseVolumeLock(volume);
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
index c4e72c6..9001068 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
@@ -58,6 +58,8 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .CryptoProtocolVersionProto.ENCRYPTION_ZONES;
 
@@ -144,10 +146,12 @@ public class OMBucketCreateRequest extends OMClientRequest {
     String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
 
     IOException exception = null;
-    metadataManager.getLock().acquireVolumeLock(volumeName);
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
-    try {
+    boolean acquiredBucketLock = false;
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volumeName);
 
+    try {
+      acquiredBucketLock = metadataManager.getLock().acquireLock(BUCKET_LOCK,
+          volumeName, bucketName);
       //Check if the volume exists
       if (metadataManager.getVolumeTable().get(volumeKey) == null) {
         LOG.debug("volume: {} not found ", volumeName);
@@ -169,8 +173,11 @@ public class OMBucketCreateRequest extends OMClientRequest {
     } catch (IOException ex) {
       exception = ex;
     } finally {
-      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
-      metadataManager.getLock().releaseVolumeLock(volumeName);
+      if (acquiredBucketLock) {
+        metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volumeName);
     }
 
     // Performing audit logging outside of the lock.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
index 853bb72..2ead69e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
 /**
  * Handles DeleteBucket Request.
  */
@@ -102,7 +104,8 @@ public class OMBucketDeleteRequest extends OMClientRequest {
 
     IOException exception = null;
     // acquire lock
-    omMetadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+        bucketName);
     try {
       // No need to check volume exists here, as bucket cannot be created
       // with out volume creation.
@@ -131,7 +134,8 @@ public class OMBucketDeleteRequest extends OMClientRequest {
     } catch (IOException ex) {
       exception = ex;
     } finally {
-      omMetadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
     // Performing audit logging outside of the lock.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java
index fa8c939..55f55c4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java
@@ -58,6 +58,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
 /**
  * Handle SetBucketProperty Request.
  */
@@ -120,7 +122,8 @@ public class OMBucketSetPropertyRequest extends OMClientRequest {
     IOException exception = null;
 
     // acquire lock
-    omMetadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+        bucketName);
 
     try {
 
@@ -182,7 +185,8 @@ public class OMBucketSetPropertyRequest extends OMClientRequest {
     } catch (IOException ex) {
       exception = ex;
     } finally {
-      omMetadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
     // Performing audit logging outside of the lock.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index cb97ba1..143abef 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -55,10 +55,8 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
-
-
-
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 
 /**
  * Handles CommitKey request.
@@ -140,7 +138,8 @@ public class OMKeyCommitRequest extends OMClientRequest
     String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
         keyName, commitKeyRequest.getClientID());
 
-    omMetadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+        bucketName);
 
     IOException exception = null;
     OmKeyInfo omKeyInfo = null;
@@ -170,7 +169,8 @@ public class OMKeyCommitRequest extends OMClientRequest
     } catch (IOException ex) {
       exception = ex;
     } finally {
-      omMetadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
     // Performing audit logging outside of the lock.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index ce04e24..be9c149 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -65,6 +65,8 @@ import org.apache.hadoop.utils.UniqueId;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
 /**
  * Handles CreateKey request.
  */
@@ -205,7 +207,8 @@ public class OMKeyCreateRequest extends OMClientRequest
     FileEncryptionInfo encryptionInfo = null;
     long openVersion = 0L;
     IOException exception = null;
-    omMetadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+        bucketName);
     try {
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
       //TODO: We can optimize this get here, if getKmsProvider is null, then
@@ -221,7 +224,8 @@ public class OMKeyCreateRequest extends OMClientRequest
           volumeName, bucketName, keyName, ex);
       exception = ex;
     } finally {
-      omMetadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
index 429f97c..9848817 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes
     .KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 
 /**
  * Handles DeleteKey request.
@@ -111,7 +112,8 @@ public class OMKeyDeleteRequest extends OMClientRequest
     String objectKey = omMetadataManager.getOzoneKey(
         volumeName, bucketName, keyName);
 
-    omMetadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+        bucketName);
     IOException exception = null;
     OmKeyInfo omKeyInfo = null;
     try {
@@ -140,7 +142,8 @@ public class OMKeyDeleteRequest extends OMClientRequest
     } catch (IOException ex) {
       exception = ex;
     } finally {
-      omMetadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
     // Performing audit logging outside of the lock.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
index 644b60d..5b6df45 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
@@ -51,6 +51,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 
 /**
  * Handles rename key request.
@@ -131,7 +132,8 @@ public class OMKeyRenameRequest extends OMClientRequest
     }
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    omMetadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+        bucketName);
 
     IOException exception = null;
     OmKeyInfo fromKeyValue = null;
@@ -179,7 +181,8 @@ public class OMKeyRenameRequest extends OMClientRequest
     } catch (IOException ex) {
       exception = ex;
     } finally {
-      omMetadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+      omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+          bucketName);
     }
 
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
index 993cd9d..cbfacae 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
@@ -54,6 +54,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .VolumeList;
 import org.apache.hadoop.util.Time;
 
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+
 /**
  * Handles volume create request.
  */
@@ -134,13 +137,14 @@ public class OMVolumeCreateRequest extends OMClientRequest
     String dbUserKey = omMetadataManager.getUserKey(owner);
     String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
     VolumeList volumeList = null;
+    boolean acquiredUserLock = false;
+    IOException exception = null;
 
     // acquire lock.
-    omMetadataManager.getLock().acquireUserLock(owner);
-    omMetadataManager.getLock().acquireVolumeLock(volume);
-
-    IOException exception = null;
+    omMetadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
+      acquiredUserLock = omMetadataManager.getLock().acquireLock(USER_LOCK,
+          owner);
       OmVolumeArgs dbVolumeArgs =
           omMetadataManager.getVolumeTable().get(dbVolumeKey);
 
@@ -166,8 +170,10 @@ public class OMVolumeCreateRequest extends OMClientRequest
     } catch (IOException ex) {
       exception = ex;
     } finally {
-      omMetadataManager.getLock().releaseVolumeLock(volumeInfo.getVolume());
-      omMetadataManager.getLock().releaseUserLock(dbUserKey);
+      if (acquiredUserLock) {
+        omMetadataManager.getLock().releaseLock(USER_LOCK, owner);
+      }
+      omMetadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
 
     // Performing audit logging outside of the lock.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
index 65f7df5..08c7991 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
@@ -50,7 +50,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
-
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
 /**
  * Handles volume delete request.
  */
@@ -104,37 +105,19 @@ public class OMVolumeDeleteRequest extends OMClientRequest
 
     OmVolumeArgs omVolumeArgs = null;
     String owner = null;
+    boolean acquiredUserLock = false;
+    IOException exception = null;
+    OzoneManagerProtocolProtos.VolumeList newVolumeList = null;
 
-    omMetadataManager.getLock().acquireVolumeLock(volume);
+    omMetadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       owner = getVolumeInfo(omMetadataManager, volume).getOwnerName();
-    } catch (IOException ex) {
-      LOG.error("Volume deletion failed for volume:{}", volume, ex);
-      omMetrics.incNumVolumeDeleteFails();
-      auditLog(auditLogger, buildAuditMessage(OMAction.DELETE_VOLUME,
-          buildVolumeAuditMap(volume), ex, userInfo));
-      return new OMVolumeDeleteResponse(null, null, null,
-          createErrorOMResponse(omResponse, ex));
-    } finally {
-      omMetadataManager.getLock().releaseVolumeLock(volume);
-    }
-
-    // Release and reacquire lock for now it will not be a problem for now, as
-    // applyTransaction serializes the operation's.
-    // TODO: Revisit this logic once HDDS-1672 checks in.
-
-    // We cannot acquire user lock holding volume lock, so released volume
-    // lock, and acquiring user and volume lock.
+      acquiredUserLock = omMetadataManager.getLock().acquireLock(USER_LOCK,
+          owner);
 
-    omMetadataManager.getLock().acquireUserLock(owner);
-    omMetadataManager.getLock().acquireVolumeLock(volume);
-
-    String dbUserKey = omMetadataManager.getUserKey(owner);
-    String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
+      String dbUserKey = omMetadataManager.getUserKey(owner);
+      String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
 
-    IOException exception = null;
-    OzoneManagerProtocolProtos.VolumeList newVolumeList = null;
-    try {
       if (!omMetadataManager.isVolumeEmpty(volume)) {
         LOG.debug("volume:{} is not empty", volume);
         throw new OMException(OMException.ResultCodes.VOLUME_NOT_EMPTY);
@@ -155,10 +138,11 @@ public class OMVolumeDeleteRequest extends OMClientRequest
 
     } catch (IOException ex) {
       exception = ex;
-
     } finally {
-      omMetadataManager.getLock().releaseVolumeLock(volume);
-      omMetadataManager.getLock().releaseUserLock(owner);
+      if (acquiredUserLock) {
+        omMetadataManager.getLock().releaseLock(USER_LOCK, owner);
+      }
+      omMetadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
 
     // Performing audit logging outside of the lock.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
index 0de2124..cc972d9 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
 /**
  * Handle set owner request for volume.
  */
@@ -123,11 +125,9 @@ public class OMVolumeSetOwnerRequest extends OMClientRequest
     OzoneManagerProtocolProtos.VolumeList newOwnerVolumeList = null;
     OmVolumeArgs omVolumeArgs = null;
     IOException exception = null;
+    boolean acquiredUserLocks = false;
 
-    omMetadataManager.getLock().acquireUserLock(newOwner);
-    omMetadataManager.getLock().acquireVolumeLock(volume);
-
-    boolean needToreleaseOldOwnerLock = false;
+    omMetadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       omVolumeArgs = omMetadataManager.getVolumeTable().get(dbVolumeKey);
 
@@ -140,25 +140,15 @@ public class OMVolumeSetOwnerRequest extends OMClientRequest
 
       oldOwner = omVolumeArgs.getOwnerName();
 
+      acquiredUserLocks =
+          omMetadataManager.getLock().acquireMultiUserLock(newOwner, oldOwner);
 
-      // Release and reacquire lock for now it will not be a problem, as
-      // applyTransaction serializes the operation's.
-      // TODO: Revisit this logic once HDDS-1672 checks in.
-
-      // releasing volume lock, as to acquire user lock we need to release
-      // volume lock.
-      omMetadataManager.getLock().releaseVolumeLock(volume);
-      omMetadataManager.getLock().acquireUserLock(oldOwner);
-      omMetadataManager.getLock().acquireVolumeLock(volume);
-
-      needToreleaseOldOwnerLock = true;
       oldOwnerVolumeList =
           omMetadataManager.getUserTable().get(oldOwner);
 
       oldOwnerVolumeList = delVolumeFromOwnerList(
           oldOwnerVolumeList, volume, oldOwner);
 
-
       newOwnerVolumeList = omMetadataManager.getUserTable().get(newOwner);
       newOwnerVolumeList = addVolumeToOwnerList(
           newOwnerVolumeList, volume, newOwner, maxUserVolumeCount);
@@ -182,11 +172,10 @@ public class OMVolumeSetOwnerRequest extends OMClientRequest
     } catch (IOException ex) {
       exception = ex;
     } finally {
-      omMetadataManager.getLock().releaseVolumeLock(volume);
-      omMetadataManager.getLock().releaseUserLock(newOwner);
-      if (needToreleaseOldOwnerLock) {
-        omMetadataManager.getLock().releaseUserLock(oldOwner);
+      if (acquiredUserLocks) {
+        omMetadataManager.getLock().releaseMultiUserLock(newOwner, oldOwner);
       }
+      omMetadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     }
 
     // Performing audit logging outside of the lock.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java
index 1d7097f..a5430aa 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
 
 /**
  * Handles set Quota request for volume.
@@ -121,7 +123,7 @@ public class OMVolumeSetQuotaRequest extends OMClientRequest {
     IOException exception = null;
     OmVolumeArgs omVolumeArgs = null;
 
-    omMetadataManager.getLock().acquireVolumeLock(volume);
+    omMetadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
       omVolumeArgs = omMetadataManager.getVolumeTable().get(dbVolumeKey);
@@ -141,7 +143,7 @@ public class OMVolumeSetQuotaRequest extends OMClientRequest {
     } catch (IOException ex) {
       exception = ex;
     } finally {
-      omMetadataManager.getLock().releaseVolumeLock(volume);
+      omMetadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
 
     // Performing audit logging outside of the lock.
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerLock.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerLock.java
deleted file mode 100644
index 2def5c4..0000000
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerLock.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.om;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Contains test-cases to verify OzoneManagerLock.
- */
-public class TestOzoneManagerLock {
-
-  @Test(timeout = 1000)
-  public void testDifferentUserLock() {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireUserLock("userOne");
-    lock.acquireUserLock("userTwo");
-    lock.releaseUserLock("userOne");
-    lock.releaseUserLock("userTwo");
-    Assert.assertTrue(true);
-  }
-
-  @Test
-  public void testSameUserLock() throws Exception {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireUserLock("userOne");
-    AtomicBoolean gotLock = new AtomicBoolean(false);
-    new Thread(() -> {
-      lock.acquireUserLock("userOne");
-      gotLock.set(true);
-      lock.releaseUserLock("userOne");
-    }).start();
-    // Let's give some time for the new thread to run
-    Thread.sleep(100);
-    // Since the new thread is trying to get lock on same user, it will wait.
-    Assert.assertFalse(gotLock.get());
-    lock.releaseUserLock("userOne");
-    // Since we have released the lock, the new thread should have the lock
-    // now
-    // Let's give some time for the new thread to run
-    Thread.sleep(100);
-    Assert.assertTrue(gotLock.get());
-  }
-
-  @Test(timeout = 1000)
-  public void testDifferentVolumeLock() {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireVolumeLock("volOne");
-    lock.acquireVolumeLock("volTwo");
-    lock.releaseVolumeLock("volOne");
-    lock.releaseVolumeLock("volTwo");
-    Assert.assertTrue(true);
-  }
-
-  @Test
-  public void testSameVolumeLock() throws Exception {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireVolumeLock("volOne");
-    AtomicBoolean gotLock = new AtomicBoolean(false);
-    new Thread(() -> {
-      lock.acquireVolumeLock("volOne");
-      gotLock.set(true);
-      lock.releaseVolumeLock("volOne");
-    }).start();
-    // Let's give some time for the new thread to run
-    Thread.sleep(100);
-    // Since the new thread is trying to get lock on same user, it will wait.
-    Assert.assertFalse(gotLock.get());
-    lock.releaseVolumeLock("volOne");
-    // Since we have released the lock, the new thread should have the lock
-    // now
-    // Let's give some time for the new thread to run
-    Thread.sleep(100);
-    Assert.assertTrue(gotLock.get());
-  }
-
-  @Test(timeout = 1000)
-  public void testDifferentBucketLock() {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireBucketLock("volOne", "bucketOne");
-    lock.acquireBucketLock("volOne", "bucketTwo");
-    lock.releaseBucketLock("volOne", "bucketTwo");
-    lock.releaseBucketLock("volOne", "bucketOne");
-    Assert.assertTrue(true);
-  }
-
-  @Test
-  public void testSameBucketLock() throws Exception {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireBucketLock("volOne", "bucketOne");
-    AtomicBoolean gotLock = new AtomicBoolean(false);
-    new Thread(() -> {
-      lock.acquireBucketLock("volOne", "bucketOne");
-      gotLock.set(true);
-      lock.releaseBucketLock("volOne", "bucketOne");
-    }).start();
-    // Let's give some time for the new thread to run
-    Thread.sleep(100);
-    // Since the new thread is trying to get lock on same user, it will wait.
-    Assert.assertFalse(gotLock.get());
-    lock.releaseBucketLock("volOne", "bucketOne");
-    // Since we have released the lock, the new thread should have the lock
-    // now
-    // Let's give some time for the new thread to run
-    Thread.sleep(100);
-    Assert.assertTrue(gotLock.get());
-  }
-
-  @Test(timeout = 1000)
-  public void testVolumeLockAfterUserLock() {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireUserLock("userOne");
-    lock.acquireVolumeLock("volOne");
-    lock.releaseVolumeLock("volOne");
-    lock.releaseUserLock("userOne");
-    Assert.assertTrue(true);
-  }
-
-  @Test(timeout = 1000)
-  public void testBucketLockAfterVolumeLock() {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireVolumeLock("volOne");
-    lock.acquireBucketLock("volOne", "bucketOne");
-    lock.releaseBucketLock("volOne", "bucketOne");
-    lock.releaseVolumeLock("volOne");
-    Assert.assertTrue(true);
-  }
-
-  @Test(timeout = 1000)
-  public void testBucketLockAfterVolumeLockAfterUserLock() {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireUserLock("userOne");
-    lock.acquireVolumeLock("volOne");
-    lock.acquireBucketLock("volOne", "bucketOne");
-    lock.releaseBucketLock("volOne", "bucketOne");
-    lock.releaseVolumeLock("volOne");
-    lock.releaseUserLock("userOne");
-    Assert.assertTrue(true);
-  }
-
-  @Test
-  public void testUserLockAfterVolumeLock() {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireVolumeLock("volOne");
-    try {
-      lock.acquireUserLock("userOne");
-      Assert.fail();
-    } catch (RuntimeException ex) {
-      String msg =
-          "cannot acquire user lock while holding " +
-              "volume, bucket or S3 bucket lock(s).";
-      Assert.assertTrue(ex.getMessage().contains(msg));
-    }
-    lock.releaseVolumeLock("volOne");
-    Assert.assertTrue(true);
-  }
-
-  @Test
-  public void testVolumeLockAfterBucketLock() {
-    OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireBucketLock("volOne", "bucketOne");
-    try {
-      lock.acquireVolumeLock("volOne");
-      Assert.fail();
-    } catch (RuntimeException ex) {
-      String msg =
-          "cannot acquire volume lock while holding bucket lock(s).";
-      Assert.assertTrue(ex.getMessage().contains(msg));
-    }
-    lock.releaseBucketLock("volOne", "bucketOne");
-    Assert.assertTrue(true);
-  }
-
-
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org