You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2022/05/27 12:06:26 UTC

[ozone] branch HDDS-4944 updated: HDDS-6701. [Multi-Tenant] Add proper locking between Ranger background sync service and tenant requests; bug fixes (#3450)

This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-4944
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-4944 by this push:
     new dcd17390be HDDS-6701. [Multi-Tenant] Add proper locking between Ranger background sync service and tenant requests; bug fixes (#3450)
dcd17390be is described below

commit dcd17390bef1b050e0d79aee3d70411aa69238ef
Author: Siyao Meng <50...@users.noreply.github.com>
AuthorDate: Fri May 27 12:06:20 2022 +0000

    HDDS-6701. [Multi-Tenant] Add proper locking between Ranger background sync service and tenant requests; bug fixes (#3450)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |  12 +
 .../docs/content/feature/S3-Tenant-Commands.md     |   7 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   5 +
 .../hadoop/ozone/om/multitenant/OzoneTenant.java   |  12 +-
 .../ozone/om/multitenant/RangerAccessPolicy.java   |  10 +
 .../apache/hadoop/ozone/om/multitenant/Tenant.java |   4 +-
 .../src/main/compose/ozonesecure/docker-config     |  10 +-
 .../ozonesecure/mockserverInitialization.json      |  24 +
 .../non-rolling-upgrade/1.1.0-1.2.0/callback.sh    |   3 +-
 .../smoketest/security/ozone-secure-tenant.robot   |   4 +-
 ...estMultiTenantAccessAuthorizerRangerPlugin.java |  11 +-
 .../om/multitenant/TestRangerBGSyncService.java    |  33 +-
 .../hadoop/ozone/shell/TestOzoneTenantShell.java   |  40 +-
 .../hadoop/ozone/om/OMMultiTenantManager.java      |  78 +--
 .../hadoop/ozone/om/OMMultiTenantManagerImpl.java  | 753 +++++++++++++--------
 .../java/org/apache/hadoop/ozone/om/TenantOp.java  |  86 +++
 .../ozone/om/multitenant/AuthorizerLock.java       |  87 +++
 .../ozone/om/multitenant/AuthorizerLockImpl.java   | 186 +++++
 .../multitenant/MultiTenantAccessAuthorizer.java   |  56 +-
 .../MultiTenantAccessAuthorizerDummyPlugin.java    |  14 +-
 .../MultiTenantAccessAuthorizerRangerPlugin.java   |  79 ++-
 .../multitenant/MultiTenantAccessController.java   |  11 +-
 .../om/multitenant/OMRangerBGSyncService.java      | 157 +++--
 .../RangerClientMultiTenantAccessController.java   |   9 +-
 .../RangerRestMultiTenantAccessController.java     |   4 +-
 .../om/request/s3/security/OMSetSecretRequest.java |  27 +-
 .../om/request/s3/security/S3GetSecretRequest.java |  25 +-
 .../request/s3/security/S3RevokeSecretRequest.java |  17 +-
 .../request/s3/security/S3SecretRequestHelper.java | 107 +++
 .../s3/tenant/OMTenantAssignAdminRequest.java      |  45 +-
 .../tenant/OMTenantAssignUserAccessIdRequest.java  |  92 +--
 .../request/s3/tenant/OMTenantCreateRequest.java   |  69 +-
 .../request/s3/tenant/OMTenantDeleteRequest.java   |  42 +-
 .../s3/tenant/OMTenantRevokeAdminRequest.java      |  28 +-
 .../tenant/OMTenantRevokeUserAccessIdRequest.java  |  35 +-
 .../hadoop/ozone/om/TestAuthorizerLockImpl.java    | 156 +++++
 .../ozone/om/TestOMMultiTenantManagerImpl.java     |  13 +-
 .../TestMultiTenantAccessController.java           |   7 +-
 .../s3/security/TestS3GetSecretRequest.java        |  12 +-
 .../shell/tenant/TenantAssignAdminHandler.java     |   5 +-
 40 files changed, 1697 insertions(+), 678 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 067777bb79..fcbcff3f55 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -528,4 +528,16 @@ public final class OzoneConsts {
       "/service/plugins/policies/service/";
 
   public static final String OZONE_TENANT_RANGER_POLICY_LABEL = "OzoneTenant";
+
+  /**
+   * The time (in ms) that AuthorizerLock try-lock operations would wait (by
+   * default, some can be overridden) before declaring timeout.
+   */
+  public static final long OZONE_TENANT_AUTHORIZER_LOCK_WAIT_MILLIS = 1000L;
+
+  /**
+   * The maximum length of accessId allowed when assigning new users to a
+   * tenant.
+   */
+  public static final int OZONE_MAXIMUM_ACCESS_ID_LENGTH = 100;
 }
diff --git a/hadoop-hdds/docs/content/feature/S3-Tenant-Commands.md b/hadoop-hdds/docs/content/feature/S3-Tenant-Commands.md
index 9154fc80de..f9ea5f6084 100644
--- a/hadoop-hdds/docs/content/feature/S3-Tenant-Commands.md
+++ b/hadoop-hdds/docs/content/feature/S3-Tenant-Commands.md
@@ -150,12 +150,13 @@ Both delegated and non-delegated tenant admin can assign and revoke **regular**
 The only difference between delegated tenant admin and non-delegated tenant admin is that delegated tenant admin can assign and revoke tenant **admins** in the tenant,
 while non-delegated tenant admin can't.
 
-Unless `--delegated=false` is specified, `ozone tenant assignadmin` assigns **delegated** tenant admins by default.
+By default, `ozone tenant assignadmin` assigns a **non-delegated** tenant admin.
+To assign a **delegated** tenant admin, specify `--delegated` or `-d`.
 
-It is possible to assign a user to be tenant admins in multiple tenants.
+It is possible to assign a user to be tenant admins in multiple tenants. Just a reminder, the user would have a different access ID under each tenant.
 
 ```shell
-ozone tenant user assignadmin <ACCESS_ID> --delegated=true --tenant=<TENANT_NAME>
+ozone tenant user assignadmin <ACCESS_ID> [-d|--delegated] --tenant=<TENANT_NAME>
 ```
 
 Example:
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index db0882c80b..187469c36a 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -158,6 +158,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_KEY_PROVIDER_
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.OLD_QUOTA_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_MAXIMUM_ACCESS_ID_LENGTH;
 
 import org.apache.logging.log4j.util.Strings;
 import org.apache.ratis.protocol.ClientId;
@@ -881,6 +882,10 @@ public class RpcClient implements ClientProtocol {
         "tenantId can't be null or empty.");
     Preconditions.checkArgument(Strings.isNotBlank(accessId),
         "accessId can't be null or empty.");
+    Preconditions.checkArgument(
+        accessId.length() <= OZONE_MAXIMUM_ACCESS_ID_LENGTH, "accessId length ("
+            + accessId.length() + ") exceeds the maximum length allowed ("
+            + OZONE_MAXIMUM_ACCESS_ID_LENGTH + ")");
     return ozoneManagerClient.tenantAssignUserAccessId(
         username, tenantId, accessId);
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenant.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenant.java
index 83ec319d2e..3e26f9d8f0 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenant.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenant.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.ozone.om.multitenant.impl.SingleVolumeTenantNamespace;
  */
 public class OzoneTenant implements Tenant {
   private final String tenantId;
-  private List<String> tenantRoleNames;
-  private List<AccessPolicy> accessPolicies;
+  private final List<String> tenantRoleNames;
+  private final List<AccessPolicy> accessPolicies;
   private final AccountNameSpace accountNameSpace;
   private final BucketNameSpace bucketNameSpace;
 
@@ -85,4 +85,12 @@ public class OzoneTenant implements Tenant {
   public List<String> getTenantRoles() {
     return tenantRoleNames;
   }
+
+  @Override
+  public String toString() {
+    return "OzoneTenant{" + "tenantId='" + tenantId + '\''
+        + ", tenantRoleNames=" + tenantRoleNames + ", accessPolicies="
+        + accessPolicies + ", accountNameSpace=" + accountNameSpace
+        + ", bucketNameSpace=" + bucketNameSpace + '}';
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java
index 169d1aa743..c4b80c7e17 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java
@@ -303,4 +303,14 @@ public class RangerAccessPolicy implements AccessPolicy {
             + "\"allowExceptions\":[]," + "\"denyPolicyItems\":[],"
             + "\"denyExceptions\":[]," + "\"service\":\"cm_ozone\"" + "}";
   }
+
+  @Override
+  public String toString() {
+    return "RangerAccessPolicy{" + "accessObject=" + accessObject
+        + ", policyMap=" + policyMap + ", roleList=" + roleList + ", policyID='"
+        + policyID + '\'' + ", policyJsonString='" + policyJsonString + '\''
+        + ", policyName='" + policyName + '\''
+        + ", lastPolicyUpdateTimeEpochMillis=" + lastPolicyUpdateTimeEpochMillis
+        + '}';
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/Tenant.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/Tenant.java
index 73e1f292d0..30424fe8fb 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/Tenant.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/Tenant.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hdds.annotation.InterfaceStability;
 public interface Tenant {
 
   /**
-   * A tenant is represnted by a globally unique TenantID.
-   * @return Tenant-ID.
+   * A tenant is represented by a globally unique tenant name.
+   * @return tenant name.
    */
   String getTenantId();
 
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index f036cf2b8f..d95b640a8a 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -132,7 +132,15 @@ OZONE_LOG_DIR=/var/log/hadoop
 
 no_proxy=om,scm,recon,s3g,kdc,localhost,127.0.0.1
 
-OZONE-SITE.XML_ozone.om.ranger.https-address=http://ranger:6080
 OZONE-SITE.XML_ozone.om.multitenancy.enabled=true
+OZONE-SITE.XML_ozone.om.ranger.https-address=http://ranger:6080
+
 OZONE-SITE.XML_ozone.om.ranger.https.admin.api.user=admin
 OZONE-SITE.XML_ozone.om.ranger.https.admin.api.passwd=passwd
+
+# Note: ozone.om.kerberos.principal and ozone.om.kerberos.keytab.file
+# (which are required for the Multi-Tenancy Ranger Java client) are already
+# properly defined above.
+
+OZONE-SITE.XML_ozone.om.multitenancy.ranger.sync.interval=30s
+OZONE-SITE.XML_ozone.om.multitenancy.ranger.sync.timeout=10s
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json b/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json
index 724798270f..8aa8048b0f 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json
@@ -70,5 +70,29 @@
     "httpResponse": {
       "body": "{\"startIndex\":0,\"pageSize\":200,\"totalCount\":13,\"resultSize\":13,\"sortType\":\"asc\",\"sortBy\":\"serviceId\",\"queryTimeMS\":1651104831041,\"services\":[{\"id\":7,\"guid\":\"b6cbaf6c-3911-4fa6-aeed-60dece4b111b\",\"isEnabled\":true,\"createdBy\":\"Admin\",\"updatedBy\":\"Admin\",\"createTime\":1651040438000,\"updateTime\":1651040438000,\"version\":1,\"type\":\"ozone\",\"name\":\"cm_ozone\",\"displayName\":\"cm_ozone\",\"description\":\"Ozone repo\",\"tagService\":\ [...]
     }
+  },
+  {
+    "httpRequest": {
+      "path": "/service/plugins/services/7"
+    },
+    "httpResponse": {
+      "body": "{\"id\":7,\"guid\":\"2a83c846-31ed-4882-b987-57a4c7c28867\",\"isEnabled\":true,\"createdBy\":\"Admin\",\"updatedBy\":\"Admin\",\"createTime\":1649339219000,\"updateTime\":1649339219000,\"version\":1,\"type\":\"ozone\",\"name\":\"cm_ozone\",\"displayName\":\"cm_ozone\",\"description\":\"Ozone repo\",\"tagService\":\"cm_tag\",\"configs\":{\"setup.additional.default.policies\":\"true\",\"hadoop.security.authentication\":\"kerberos\",\"ozone.om.http-address\":\"http://localhos [...]
+    }
+  },
+  {
+    "httpRequest": {
+      "path": "/service/plugins/policies/service/7"
+    },
+    "httpResponse": {
+      "body": "{\"id\":7,\"guid\":\"2a83c846-31ed-4882-b987-57a4c7c28867\",\"isEnabled\":true,\"createdBy\":\"Admin\",\"updatedBy\":\"Admin\",\"createTime\":1649339219000,\"updateTime\":1649339219000,\"version\":1,\"type\":\"ozone\",\"name\":\"cm_ozone\",\"displayName\":\"cm_ozone\",\"description\":\"Ozone repo\",\"tagService\":\"cm_tag\",\"configs\":{\"setup.additional.default.policies\":\"true\",\"hadoop.security.authentication\":\"kerberos\",\"ozone.om.http-address\":\"http://localhos [...]
+    }
+  },
+  {
+    "httpRequest": {
+      "path": "/service/plugins/policies/444"
+    },
+    "httpResponse": {
+      "body": "{id: 444}"
+    }
   }
 ]
diff --git a/hadoop-ozone/dist/src/main/compose/upgrade/upgrades/non-rolling-upgrade/1.1.0-1.2.0/callback.sh b/hadoop-ozone/dist/src/main/compose/upgrade/upgrades/non-rolling-upgrade/1.1.0-1.2.0/callback.sh
index b5d5285aa9..8bf5fd13fb 100755
--- a/hadoop-ozone/dist/src/main/compose/upgrade/upgrades/non-rolling-upgrade/1.1.0-1.2.0/callback.sh
+++ b/hadoop-ozone/dist/src/main/compose/upgrade/upgrades/non-rolling-upgrade/1.1.0-1.2.0/callback.sh
@@ -70,7 +70,8 @@ with_old_version_downgraded() {
 
 with_new_version_finalized() {
   _check_hdds_mlvs 2
-  _check_om_mlvs 1
+  # In Ozone 1.2.0, OM has only one layout version.
+  _check_om_mlvs 0
 
   validate old1
   validate new1
diff --git a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
index 17da56105e..0f77ec3b76 100644
--- a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
@@ -97,11 +97,11 @@ Delete Tenant Failure Tenant Not Empty
 Create Tenant Failure with Regular User
     Run Keyword         Kinit test user     testuser2    testuser2.keytab
     ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant create tenanttwo
-                        Should contain   ${output}         PERMISSION_DENIED User 'testuser2/scm@EXAMPLE.COM' is not an Ozone admin.
+                        Should contain   ${output}         PERMISSION_DENIED User 'testuser2/scm@EXAMPLE.COM' or 'testuser2' is not an Ozone admin
 
 SetSecret Failure with Regular User
     ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant user set-secret 'tenantone$testuser' --secret=somesecret2
-                        Should contain   ${output}         Permission denied. Requested accessId
+                        Should contain   ${output}         USER_MISMATCH Requested accessId 'tenantone$testuser' doesn't belong to current user 'testuser2/scm@EXAMPLE.COM', nor does current user have Ozone or tenant administrator privilege
 
 Create Bucket 2 Success with somesecret1 via S3 API
     ${output} =         Execute          aws s3api --endpoint-url ${S3G_ENDPOINT_URL} create-bucket --bucket bucket-test2
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java
index dae673fde5..0f9ccb44e5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
-import org.apache.http.auth.BasicUserPrincipal;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -105,7 +104,7 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
       OzoneTenantRolePrincipal userRole =
           new OzoneTenantRolePrincipal("tenant1-UserRole");
 
-      BasicUserPrincipal userPrincipal = new BasicUserPrincipal("user1Test");
+      String userPrincipal = "user1Test";
       usersIdsCreated.add(
           omm.assignUserToRole(userPrincipal, userRole.getName(), false));
       usersIdsCreated.add(
@@ -137,7 +136,7 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
         omm.deleteUser(id);
       }
       for (String id : groupIdsCreated) {
-        omm.deleteRole(id);
+        omm.deleteRoleById(id);
       }
     }
   }
@@ -146,7 +145,7 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
   @Ignore("TODO:Requires (mocked) Ranger endpoint")
   public void testMultiTenantAccessAuthorizerRangerPluginWithoutIds()
       throws Exception {
-    BasicUserPrincipal userPrincipal = null;
+    String userPrincipal = null;
     simulateOzoneSiteXmlConfig();
     final MultiTenantAccessAuthorizer omm =
         new MultiTenantAccessAuthorizerRangerPlugin();
@@ -163,7 +162,7 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
       omm.createRole(group2Principal.getName(), group1Principal.getName());
       groupIdsCreated.add(omm.getRole(group2Principal));
 
-      userPrincipal = new BasicUserPrincipal("user1Test");
+      userPrincipal = "user1Test";
       omm.assignUserToRole(userPrincipal, group2Principal.getName(), false);
 
       AccessPolicy tenant1VolumeAccessPolicy = createVolumeAccessPolicy(
@@ -195,7 +194,7 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
       String userId = omm.getUserId(userPrincipal);
       omm.deleteUser(userId);
       for (String id : groupIdsCreated) {
-        omm.deleteRole(id);
+        omm.deleteRoleById(id);
       }
     }
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java
index 04b633890d..2334829601 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java
@@ -69,7 +69,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.util.KerberosName;
-import org.apache.http.auth.BasicUserPrincipal;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -123,7 +122,7 @@ public class TestRangerBGSyncService {
   // List of role ID created in Ranger
   private final List<String> rolesCreated = new ArrayList<>();
   // List of users created in Ranger
-  private final List<BasicUserPrincipal> usersCreated = new ArrayList<>();
+  private final List<String> usersCreated = new ArrayList<>();
 
   private static OzoneConfiguration conf;
   private OzoneManager ozoneManager;
@@ -230,6 +229,8 @@ public class TestRangerBGSyncService {
     when(omMultiTenantManager.newDefaultBucketAccessPolicy(eq(TENANT_ID),
         Mockito.any(OzoneTenantRolePrincipal.class)))
         .thenReturn(newBucketAccessPolicy(TENANT_ID, TENANT_ID));
+    when(omMultiTenantManager.getAuthorizerLock())
+        .thenReturn(new AuthorizerLockImpl());
 
     // Raft client request handling
     OzoneManagerRatisServer omRatisServer = mock(OzoneManagerRatisServer.class);
@@ -301,7 +302,8 @@ public class TestRangerBGSyncService {
   }
 
   long initBGSync() throws IOException {
-    bgSync = new OMRangerBGSyncService(ozoneManager, auth,
+    bgSync = new OMRangerBGSyncService(ozoneManager,
+        ozoneManager.getMultiTenantManager(), auth,
         TEST_SYNC_INTERVAL_SEC, TimeUnit.SECONDS, TEST_SYNC_TIMEOUT_SEC);
     return bgSync.getLatestRangerServiceVersion();
   }
@@ -311,8 +313,6 @@ public class TestRangerBGSyncService {
     policiesCreated.clear();
     rolesCreated.clear();
 
-    BasicUserPrincipal userAlice = new BasicUserPrincipal(USER_ALICE_SHORT);
-    BasicUserPrincipal userBob = new BasicUserPrincipal(USER_BOB_SHORT);
     // Tenant name to be used for this test
     final String tenantId = TENANT_ID;
     // volume name = bucket namespace name
@@ -338,21 +338,21 @@ public class TestRangerBGSyncService {
                 bucketNamespacePolicyName, bucketPolicyName));
         // Access ID entry for alice
         final String aliceAccessId = OMMultiTenantManager.getDefaultAccessId(
-            tenantId, userAlice.getName());
+            tenantId, USER_ALICE_SHORT);
         omMetadataManager.getTenantAccessIdTable().put(aliceAccessId,
             new OmDBAccessIdInfo.Builder()
                 .setTenantId(tenantId)
-                .setUserPrincipal(userAlice.getName())
+                .setUserPrincipal(USER_ALICE_SHORT)
                 .setIsAdmin(false)
                 .setIsDelegatedAdmin(false)
                 .build());
         // Access ID entry for bob
         final String bobAccessId = OMMultiTenantManager.getDefaultAccessId(
-            tenantId, userBob.getName());
+            tenantId, USER_BOB_SHORT);
         omMetadataManager.getTenantAccessIdTable().put(bobAccessId,
             new OmDBAccessIdInfo.Builder()
                 .setTenantId(tenantId)
-                .setUserPrincipal(userBob.getName())
+                .setUserPrincipal(USER_BOB_SHORT)
                 .setIsAdmin(false)
                 .setIsDelegatedAdmin(false)
                 .build());
@@ -383,8 +383,8 @@ public class TestRangerBGSyncService {
     try {
       LOG.info("Creating user in Ranger: {}", USER_ALICE_SHORT);
       auth.createUser(USER_ALICE_SHORT, "password1");
-      usersCreated.add(userAlice);
-      auth.assignUserToRole(userAlice, auth.getRole(userRole), false);
+      usersCreated.add(USER_ALICE_SHORT);
+      auth.assignUserToRole(USER_ALICE_SHORT, auth.getRole(userRole), false);
     } catch (Exception e) {
       Assert.fail(e.getMessage());
     }
@@ -392,8 +392,8 @@ public class TestRangerBGSyncService {
     try {
       LOG.info("Creating user in Ranger: {}", USER_BOB_SHORT);
       auth.createUser(USER_BOB_SHORT, "password2");
-      usersCreated.add(userBob);
-      auth.assignUserToRole(userBob, auth.getRole(userRole), false);
+      usersCreated.add(USER_BOB_SHORT);
+      auth.assignUserToRole(USER_BOB_SHORT, auth.getRole(userRole), false);
     } catch (Exception e) {
       Assert.fail(e.getMessage());
     }
@@ -439,7 +439,7 @@ public class TestRangerBGSyncService {
       final String roleName = jObj.get("name").getAsString();
       try {
         LOG.info("Deleting role: {}", roleName);
-        auth.deleteRole(roleId);
+        auth.deleteRoleById(roleId);
       } catch (Exception e) {
         LOG.error(e.getMessage());
       }
@@ -447,7 +447,7 @@ public class TestRangerBGSyncService {
   }
 
   public void cleanupUsers() {
-    for (BasicUserPrincipal user : usersCreated) {
+    for (String user : usersCreated) {
       try {
         LOG.info("Deleting user: {}", user);
         String userId = auth.getUserId(user);
@@ -604,8 +604,7 @@ public class TestRangerBGSyncService {
     Assert.assertEquals(
         OMMultiTenantManager.getDefaultUserRoleName(TENANT_ID), userRoleName);
 
-    auth.revokeUserFromRole(
-        new BasicUserPrincipal(USER_BOB_SHORT), auth.getRole(userRoleName));
+    auth.revokeUserFromRole(USER_BOB_SHORT, auth.getRole(userRoleName));
 
     HashSet<String> userSet = new HashSet<>();
     userSet.add(USER_ALICE_SHORT);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
index 2b449906c9..6d1ece7f79 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.shell;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.cli.GenericCli;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -27,6 +28,7 @@ import org.apache.hadoop.io.retry.RetryInvocationHandler;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.multitenant.AuthorizerLockImpl;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
@@ -197,6 +199,7 @@ public class TestOzoneTenantShell {
         OMTenantAssignUserAccessIdRequest.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(
         MultiTenantAccessAuthorizerRangerPlugin.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(AuthorizerLockImpl.LOG, Level.DEBUG);
   }
 
   @After
@@ -512,6 +515,13 @@ public class TestOzoneTenantShell {
         + "export AWS_SECRET_ACCESS_KEY='", false);
     checkOutput(err, "", true);
 
+    // accessId length exceeding limit, should fail
+    executeHA(tenantShell, new String[] {
+        "user", "assign", StringUtils.repeat('a', 100), "--tenant=dev"});
+    checkOutput(out, "", true);
+    checkOutput(err, "accessId length (104) exceeds the maximum length "
+        + "allowed (100)\n", true);
+
     // Get user info
     // Equivalent to `ozone tenant user info bob`
     executeHA(tenantShell, new String[] {
@@ -524,7 +534,7 @@ public class TestOzoneTenantShell {
 
     // Assign admin
     executeHA(tenantShell, new String[] {
-        "user", "assign-admin", "dev$bob", "--tenant=dev"});
+        "user", "assign-admin", "dev$bob", "--tenant=dev", "--delegated"});
     checkOutput(out, "", true);
     checkOutput(err, "", true);
 
@@ -822,13 +832,9 @@ public class TestOzoneTenantShell {
           "--secret=somesecret2"});
       Assert.assertTrue("Should return non-zero exit code!", exitC != 0);
       checkOutput(out, "", true);
-      checkOutput(err, "Permission denied. Requested accessId "
-          + "'tenant-test-set-secret$alice' and user doesn't satisfy any of:\n"
-          + "1) accessId match current username: 'bob';\n"
-          + "2) is an OM admin;\n"
-          + "3) user is assigned to a tenant under this accessId;\n"
-          + "4) user is an admin of the tenant where the accessId is "
-          + "assigned\n", true);
+      checkOutput(err, "Requested accessId 'tenant-test-set-secret$alice'"
+          + " doesn't belong to current user 'bob', nor does current user"
+          + " have Ozone or tenant administrator privilege\n", true);
       return null;
     });
 
@@ -1058,4 +1064,22 @@ public class TestOzoneTenantShell {
     checkOutput(err, "Deleted tenant '" + tenantName + "'.\n", false);
     deleteVolume(tenantName);
   }
+
+  @Test
+  public void testCreateTenantOnExistingVolumeShouldFail() throws IOException {
+    final String testVolume = "existing-volume-1";
+    int exitC = execute(ozoneSh, new String[] {"volume", "create", testVolume});
+    // Volume create should succeed
+    Assert.assertEquals(0, exitC);
+    checkOutput(out, "", true);
+    checkOutput(err, "", true);
+
+    // Try to create tenant on the same volume, should fail
+    executeHA(tenantShell, new String[] {"create", testVolume});
+    checkOutput(out, "", true);
+    checkOutput(err, "Volume already exists\n", true);
+
+    // Clean up
+    deleteVolume(testVolume);
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
index 96ac1e45aa..48640c31f7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
@@ -21,18 +21,20 @@ import java.io.IOException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.TenantUserList;
 import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.AuthorizerLock;
 import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
 import org.apache.hadoop.ozone.om.multitenant.OzoneTenantRolePrincipal;
 import org.apache.hadoop.ozone.om.multitenant.Tenant;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.http.auth.BasicUserPrincipal;
 import org.slf4j.Logger;
 
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_ENABLED;
@@ -45,6 +47,8 @@ import static org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl.OZONE_OM_TENAN
 /**
  * OM MultiTenant manager interface.
  */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
 public interface OMMultiTenantManager {
   /* TODO: Outdated
    * Init multi-tenant manager. Performs initialization e.g.
@@ -83,55 +87,9 @@ public interface OMMultiTenantManager {
    */
   OMMetadataManager getOmMetadataManager();
 
-  /**
-   * Given a TenantID String, Create and return Tenant Interface.
-   *
-   * @param tenantID
-   * @param userRoleName
-   * @param adminRoleName
-   * @return Tenant interface.
-   */
-  Tenant createTenantAccessInAuthorizer(String tenantID, String userRoleName,
-      String adminRoleName) throws IOException;
-
-  /**
-   * Given a TenantID, destroys all state associated with that tenant.
-   * This is different from deactivateTenant() above.
-   * @param tenant
-   * @return
-   * @throws IOException
-   */
-  void removeTenantAccessFromAuthorizer(Tenant tenant) throws Exception;
-
-
-  /**
-   * Creates a new user that exists for S3 API access to Ozone.
-   * @param principal
-   * @param tenantId
-   * @param accessId
-   * @return Unique UserID.
-   * @throws IOException if there is any error condition detected.
-   */
-  String assignUserToTenant(BasicUserPrincipal principal, String tenantId,
-                            String accessId) throws IOException;
-
-  /**
-   * Revoke user accessId.
-   * @param accessId
-   * @throws IOException
-   */
-  void revokeUserAccessId(String accessId) throws IOException;
+  TenantOp getAuthorizerOp();
 
-  /**
-   * A placeholder method to remove a failed-to-assign accessId from
-   * tenantCache.
-   * Triggered in OMAssignUserToTenantRequest#handleRequestFailure.
-   * Most likely becomes unnecessary if we move OMMTM call to the end of the
-   * request (current it runs in preExecute).
-   * TODO: Remove this if unneeded when Ranger thread patch lands.
-   */
-  void removeUserAccessIdFromCache(String accessId, String userPrincipal,
-                                   String tenantId);
+  TenantOp getCacheOp();
 
   /**
    * Given an accessId, return kerberos user name for the tenant user.
@@ -210,18 +168,6 @@ public interface OMMultiTenantManager {
     return tenantId + OzoneConsts.DEFAULT_TENANT_BUCKET_POLICY_SUFFIX;
   }
 
-  /**
-   * Given a user, make him an admin of the corresponding Tenant.
-   * @param accessID
-   * @param delegated
-   */
-  void assignTenantAdmin(String accessID, boolean delegated) throws IOException;
-
-  /**
-   * Given a user, remove him as admin of the corresponding Tenant.
-   */
-  void revokeTenantAdmin(String accessID) throws IOException;
-
   /**
    * Passes check only when caller is an Ozone (cluster) admin, throws
    * OMException otherwise.
@@ -267,6 +213,14 @@ public interface OMMultiTenantManager {
    */
   String getTenantAdminRoleName(String tenantId) throws IOException;
 
+  /**
+   * Get Tenant object of given tenant name from OM DB.
+   * @param tenantId tenant name
+   * @return Tenant
+   * @throws IOException
+   */
+  Tenant getTenantFromDBById(String tenantId) throws IOException;
+
   boolean isUserAccessIdPrincipalOrTenantAdmin(String accessId,
       UserGroupInformation ugi) throws IOException;
 
@@ -362,4 +316,6 @@ public interface OMMultiTenantManager {
    */
   AccessPolicy newDefaultBucketAccessPolicy(String tenantId,
       OzoneTenantRolePrincipal userRole) throws IOException;
+
+  AuthorizerLock getAuthorizerLock();
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
index 36f902587b..520c2041bc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -63,15 +64,17 @@ import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
 import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo;
 import org.apache.hadoop.ozone.om.helpers.TenantUserList;
 import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.AuthorizerLock;
+import org.apache.hadoop.ozone.om.multitenant.AuthorizerLockImpl;
 import org.apache.hadoop.ozone.om.multitenant.BucketNameSpace;
 import org.apache.hadoop.ozone.om.multitenant.CachedTenantState;
 import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
-import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
-import org.apache.hadoop.ozone.om.multitenant.OzoneTenant;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessAuthorizer;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessAuthorizerDummyPlugin;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessAuthorizerRangerPlugin;
+import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
 import org.apache.hadoop.ozone.om.multitenant.OzoneOwnerPrincipal;
+import org.apache.hadoop.ozone.om.multitenant.OzoneTenant;
 import org.apache.hadoop.ozone.om.multitenant.OzoneTenantRolePrincipal;
 import org.apache.hadoop.ozone.om.multitenant.RangerAccessPolicy;
 import org.apache.hadoop.ozone.om.multitenant.Tenant;
@@ -79,7 +82,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserAcc
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.http.auth.BasicUserPrincipal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,7 +100,6 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   public static final String OZONE_OM_TENANT_DEV_SKIP_RANGER =
       "ozone.om.tenant.dev.skip.ranger";
 
-  private MultiTenantAccessAuthorizer authorizer;
   private final OzoneManager ozoneManager;
   private final OMMetadataManager omMetadataManager;
   private final OzoneConfiguration conf;
@@ -106,6 +107,16 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   private final Map<String, CachedTenantState> tenantCache;
   private final ReentrantReadWriteLock tenantCacheLock;
   private final OMRangerBGSyncService omRangerBGSyncService;
+  private MultiTenantAccessAuthorizer authorizer;
+  private final AuthorizerLock authorizerLock;
+  /**
+   * Authorizer operations. Meant to be called in tenant preExecute.
+   */
+  private final TenantOp authorizerOp;
+  /**
+   * Cache operations. Meant to be called in tenant validateAndUpdateCache.
+   */
+  private final TenantOp cacheOp;
 
   public OMMultiTenantManagerImpl(OzoneManager ozoneManager,
                                   OzoneConfiguration conf)
@@ -115,6 +126,7 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     this.omMetadataManager = ozoneManager.getMetadataManager();
     this.tenantCache = new ConcurrentHashMap<>();
     this.tenantCacheLock = new ReentrantReadWriteLock();
+    this.authorizerLock = new AuthorizerLockImpl();
 
     loadTenantCacheFromDB();
 
@@ -137,6 +149,9 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
       }
     }
 
+    cacheOp = new CacheOp(tenantCache, tenantCacheLock);
+    authorizerOp = new AuthorizerOp(authorizer, tenantCache, tenantCacheLock);
+
     // Define the internal time unit for the config
     final TimeUnit internalTimeUnit = TimeUnit.SECONDS;
     // Get the interval in internal time unit
@@ -152,8 +167,9 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
         OZONE_OM_MULTITENANCY_RANGER_SYNC_TIMEOUT_DEFAULT.getUnit(),
         internalTimeUnit);
     // Initialize the Ranger Sync Thread
-    omRangerBGSyncService = new OMRangerBGSyncService(ozoneManager, authorizer,
-        rangerSyncInterval, internalTimeUnit, rangerSyncTimeout);
+    omRangerBGSyncService = new OMRangerBGSyncService(ozoneManager, this,
+        authorizer, rangerSyncInterval, internalTimeUnit, rangerSyncTimeout);
+
     // Start the Ranger Sync Thread
     this.start();
   }
@@ -183,246 +199,453 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     return omMetadataManager;
   }
 
-  // TODO: Cleanup up this Java doc.
+  @Override
+  public TenantOp getAuthorizerOp() {
+    return authorizerOp;
+  }
+
+  @Override
+  public TenantOp getCacheOp() {
+    return cacheOp;
+  }
+
   /**
-   *  Algorithm
-   *  OM State :
-   *    - Validation (Part of Ratis Request)
-   *    - create volume {Part of RATIS request}
-   *    - Persistence to OM DB {Part of RATIS request}
-   *  Authorizer-plugin(Ranger) State :
-   *    - For every tenant create two user groups
-   *        # GroupTenantAllUsers
-   *        # GroupTenantAllAdmins
-   *
-   *    - For every tenant create two default policies
-   *    - Note: plugin states are made idempotent. Onus of returning EEXIST is
-   *      part of validation in Ratis-Request. if the groups/policies exist
-   *      with the same name (Due to an earlier failed/success request), in
-   *      plugin, we just update in-memory-map here and return success.
-   *    - The job of cleanup of any half-done authorizer-plugin state is done
-   *      by a background thread.
-   *  Finally :
-   *    - Update all Maps maintained by Multi-Tenant-Manager
-   *  In case of failure :
-   *    - Undo all Ranger State
-   *    - remove updates to the Map
-   *  Locking :
-   *    - Create/Manage Tenant/User operations are control path operations.
-   *      We can do all of this as part of holding a coarse lock and synchronize
-   *      these control path operations.
-   *
-   * @param tenantId
-   * @param userRoleName
-   * @param adminRoleName
-   * @return Tenant
-   * @throws IOException
+   * Implements tenant authorizer operations.
    */
-  @Override
-  public Tenant createTenantAccessInAuthorizer(String tenantId,
-      String userRoleName, String adminRoleName)
-      throws IOException {
+  public class AuthorizerOp implements TenantOp {
+
+    private final MultiTenantAccessAuthorizer authorizer;
+    private final Map<String, CachedTenantState> tenantCache;
+    private final ReentrantReadWriteLock tenantCacheLock;
+
+    AuthorizerOp(MultiTenantAccessAuthorizer authorizer,
+        Map<String, CachedTenantState> tenantCache,
+        ReentrantReadWriteLock tenantCacheLock) {
+      this.authorizer = authorizer;
+      this.tenantCache = tenantCache;
+      this.tenantCacheLock = tenantCacheLock;
+    }
 
-    Tenant tenant = new OzoneTenant(tenantId);
-    try {
-      tenantCacheLock.writeLock().lock();
+    /**
+     * Throws if authorizer write lock hasn't been acquired.
+     */
+    private void checkAcquiredAuthorizerWriteLock() throws OMException {
 
-      // Create admin role first
-      String adminRoleId = authorizer.createRole(adminRoleName, null);
-      tenant.addTenantAccessRole(adminRoleId);
-
-      // Then create user role, and add admin role as its delegated admin
-      String userRoleId = authorizer.createRole(userRoleName, adminRoleName);
-      tenant.addTenantAccessRole(userRoleId);
-
-      BucketNameSpace bucketNameSpace = tenant.getTenantBucketNameSpace();
-      // Bucket namespace is volume
-      for (OzoneObj volume : bucketNameSpace.getBucketNameSpaceObjects()) {
-        String volumeName = volume.getVolumeName();
-
-        final OzoneTenantRolePrincipal userRole =
-            new OzoneTenantRolePrincipal(userRoleName);
-        final OzoneTenantRolePrincipal adminRole =
-            new OzoneTenantRolePrincipal(adminRoleName);
-
-        // Allow Volume List access
-        AccessPolicy tenantVolumeAccessPolicy = newDefaultVolumeAccessPolicy(
-            volumeName, userRole, adminRole);
-        tenantVolumeAccessPolicy.setPolicyID(
-            authorizer.createAccessPolicy(tenantVolumeAccessPolicy));
-        tenant.addTenantAccessPolicy(tenantVolumeAccessPolicy);
-
-        // Allow Bucket Create within Volume
-        AccessPolicy tenantBucketCreatePolicy =
-            newDefaultBucketAccessPolicy(volumeName, userRole);
-        tenantBucketCreatePolicy.setPolicyID(
-            authorizer.createAccessPolicy(tenantBucketCreatePolicy));
-        tenant.addTenantAccessPolicy(tenantBucketCreatePolicy);
+      // Check if lock is acquired by the current thread
+      if (!authorizerLock.isWriteLockHeldByCurrentThread()) {
+        throw new OMException("Authorizer write lock must have been held "
+            + "before calling this", INTERNAL_ERROR);
       }
+    }
 
-      if (tenantCache.containsKey(tenantId)) {
-        LOG.warn("Cache entry for tenant '{}' somehow already exists, "
-            + "will be overwritten", tenantId);  // TODO: throw exception?
+    /**
+     *  Algorithm
+     *  OM State :
+     *    - Validation (Part of Ratis Request)
+     *    - create volume {Part of RATIS request}
+     *    - Persistence to OM DB {Part of RATIS request}
+     *  Authorizer-plugin(Ranger) State :
+     *    - For every tenant create two user groups
+     *        # GroupTenantAllUsers
+     *        # GroupTenantAllAdmins
+     *
+     *    - For every tenant create two default policies
+     *    - Note: plugin states are made idempotent. Onus of returning EEXIST is
+     *      part of validation in Ratis-Request. if the groups/policies exist
+     *      with the same name (Due to an earlier failed/success request), in
+     *      plugin, we just update in-memory-map here and return success.
+     *    - The job of cleanup of any half-done authorizer-plugin state is done
+     *      by a background thread.
+     *  Finally :
+     *    - Update all Maps maintained by Multi-Tenant-Manager
+     *  In case of failure :
+     *    - Undo all Ranger State
+     *    - remove updates to the Map
+     *  Locking :
+     *    - Create/Manage Tenant/User operations are control path operations.
+     *      We can do all of this as part of holding a coarse lock and
+     *      synchronize these control path operations.
+     *
+     * @param tenantId tenant name
+     * @param userRoleName user role name
+     * @param adminRoleName admin role name
+     * @return Tenant
+     * @throws IOException
+     */
+    @Override
+    public void createTenant(
+        String tenantId, String userRoleName, String adminRoleName)
+        throws IOException {
+
+      checkAcquiredAuthorizerWriteLock();
+
+      Tenant tenant = new OzoneTenant(tenantId);
+
+      try {
+        // Create admin role first
+        String adminRoleId = authorizer.createRole(adminRoleName, null);
+        tenant.addTenantAccessRole(adminRoleId);
+
+        // Then create user role, and add admin role as its delegated admin
+        String userRoleId = authorizer.createRole(userRoleName, adminRoleName);
+        tenant.addTenantAccessRole(userRoleId);
+
+        BucketNameSpace bucketNameSpace = tenant.getTenantBucketNameSpace();
+        // Bucket namespace is volume
+        for (OzoneObj volume : bucketNameSpace.getBucketNameSpaceObjects()) {
+          String volumeName = volume.getVolumeName();
+
+          final OzoneTenantRolePrincipal userRole =
+              new OzoneTenantRolePrincipal(userRoleName);
+          final OzoneTenantRolePrincipal adminRole =
+              new OzoneTenantRolePrincipal(adminRoleName);
+
+          // Allow Volume List access
+          AccessPolicy tenantVolumeAccessPolicy = newDefaultVolumeAccessPolicy(
+              volumeName, userRole, adminRole);
+          tenantVolumeAccessPolicy.setPolicyID(
+              authorizer.createAccessPolicy(tenantVolumeAccessPolicy));
+          tenant.addTenantAccessPolicy(tenantVolumeAccessPolicy);
+
+          // Allow Bucket Create within Volume
+          AccessPolicy tenantBucketCreatePolicy =
+              newDefaultBucketAccessPolicy(volumeName, userRole);
+          tenantBucketCreatePolicy.setPolicyID(
+              authorizer.createAccessPolicy(tenantBucketCreatePolicy));
+          tenant.addTenantAccessPolicy(tenantBucketCreatePolicy);
+        }
+
+        // Does NOT update tenant cache here
+      } catch (IOException e) {
+        // Expect the sync thread to restore the admin role later if op succeeds
+        throw new OMException(e, TENANT_AUTHORIZER_ERROR);
       }
+    }
 
-      // TODO: Move tenantCache update to a separate call createTenantAccessInDB
-      //  createTenantAccessInAuthorizer is called preExecute to update Ranger
-      //  createTenantAccessInDB will be called in validateAndUpdateCache
-      //  Do the same to all other InAuthorizer calls as well.
-      // New entry in tenant cache
-      tenantCache.put(tenantId, new CachedTenantState(
-          tenantId, userRoleName, adminRoleName));
+    @Override
+    public void deleteTenant(Tenant tenant) throws IOException {
+
+      checkAcquiredAuthorizerWriteLock();
+
+      LOG.info("Deleting tenant policies and roles from Ranger: {}", tenant);
 
-    } catch (IOException e) {
       try {
-        removeTenantAccessFromAuthorizer(tenant);
-      } catch (IOException ignored) {
-        // Best effort cleanup.
+        for (AccessPolicy policy : tenant.getTenantAccessPolicies()) {
+          authorizer.deletePolicyByName(policy.getPolicyName());
+        }
+
+        for (String roleName : tenant.getTenantRoles()) {
+          authorizer.deleteRoleByName(roleName);
+        }
+      } catch (IOException e) {
+        // Expect the sync thread to restore the admin role later if op succeeds
+        throw new OMException(e, TENANT_AUTHORIZER_ERROR);
       }
-      throw e;
-    } finally {
-      tenantCacheLock.writeLock().unlock();
     }
-    return tenant;
-  }
 
-  @Override
-  public void removeTenantAccessFromAuthorizer(Tenant tenant)
-      throws IOException {
-    try {
-      tenantCacheLock.writeLock().lock();
-      for (AccessPolicy policy : tenant.getTenantAccessPolicies()) {
-        authorizer.deletePolicyById(policy.getPolicyID());
+    /**
+     *  Algorithm
+     *  Authorizer-plugin(Ranger) State :
+     *    - create User in Ranger DB
+     *    - For every user created
+     *        Add them to # GroupTenantAllUsers
+     *  In case of failure :
+     *    - Undo all Ranger State
+     *    - remove updates to the Map
+     *  Locking :
+     *    - Create/Manage Tenant/User operations are control path operations.
+     *      We can do all of this as part of holding a coarse lock and
+     *      synchronize these control path operations.
+     *
+     * @param userPrincipal
+     * @param tenantId
+     * @param accessId
+     * @throws IOException
+     */
+    @Override
+    public void assignUserToTenant(String userPrincipal,
+        String tenantId, String accessId) throws IOException {
+
+      checkAcquiredAuthorizerWriteLock();
+
+      tenantCacheLock.readLock().lock();
+      try {
+        final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+        Preconditions.checkNotNull(cachedTenantState,
+            "Cache entry for tenant '" + tenantId + "' does not exist");
+
+        // Does NOT update tenant cache here
+
+        final String tenantUserRoleName =
+            tenantCache.get(tenantId).getTenantUserRoleName();
+        final OzoneTenantRolePrincipal tenantUserRolePrincipal =
+            new OzoneTenantRolePrincipal(tenantUserRoleName);
+        String roleJsonStr = authorizer.getRole(tenantUserRolePrincipal);
+        final String roleId =
+            authorizer.assignUserToRole(userPrincipal, roleJsonStr, false);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("roleId that the user is assigned to: {}", roleId);
+        }
+
+      } catch (IOException e) {
+        // Expect the sync thread to restore the user role later if op succeeds
+        throw new OMException(e, TENANT_AUTHORIZER_ERROR);
+      } finally {
+        tenantCacheLock.readLock().unlock();
       }
-      for (String roleId : tenant.getTenantRoles()) {
-        authorizer.deleteRole(roleId);
+    }
+
+    @Override
+    public void revokeUserAccessId(String accessId, String tenantId)
+        throws IOException {
+
+      checkAcquiredAuthorizerWriteLock();
+
+      tenantCacheLock.readLock().lock();
+      try {
+        final OmDBAccessIdInfo omDBAccessIdInfo =
+            omMetadataManager.getTenantAccessIdTable().get(accessId);
+        if (omDBAccessIdInfo == null) {
+          throw new OMException(INVALID_ACCESS_ID);
+        }
+        final String tenantIdGot = omDBAccessIdInfo.getTenantId();
+        Preconditions.checkArgument(tenantIdGot.equals(tenantId));
+
+        final String userPrincipal = omDBAccessIdInfo.getUserPrincipal();
+
+        // Delete user from role in Ranger
+        final String tenantUserRoleName =
+            tenantCache.get(tenantId).getTenantUserRoleName();
+        final OzoneTenantRolePrincipal tenantUserRolePrincipal =
+            new OzoneTenantRolePrincipal(tenantUserRoleName);
+        String roleJsonStr = authorizer.getRole(tenantUserRolePrincipal);
+        final String roleId =
+            authorizer.revokeUserFromRole(userPrincipal, roleJsonStr);
+        Preconditions.checkNotNull(roleId);
+
+        // Does NOT update tenant cache here
+      } catch (IOException e) {
+        // Expect the sync thread to restore the user role later if op succeeds
+        throw new OMException(e, TENANT_AUTHORIZER_ERROR);
+      } finally {
+        tenantCacheLock.readLock().unlock();
       }
-      if (tenantCache.containsKey(tenant.getTenantId())) {
-        LOG.info("Removing tenant {} from in memory cached state",
-            tenant.getTenantId());
-        tenantCache.remove(tenant.getTenantId());
+    }
+
+    @Override
+    public void assignTenantAdmin(String accessId, boolean delegated)
+        throws IOException {
+
+      checkAcquiredAuthorizerWriteLock();
+
+      tenantCacheLock.readLock().lock();
+      try {
+        // tenant name is needed to retrieve role name
+        final String tenantId = getTenantForAccessIDThrowIfNotFound(accessId);
+
+        final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+
+        final String tenantAdminRoleName =
+            cachedTenantState.getTenantAdminRoleName();
+        final OzoneTenantRolePrincipal existingAdminRole =
+            new OzoneTenantRolePrincipal(tenantAdminRoleName);
+
+        final String roleJsonStr = authorizer.getRole(existingAdminRole);
+        final String userPrincipal = getUserNameGivenAccessId(accessId);
+        // Update Ranger. Add user principal (not accessId!) to the role
+        final String roleId = authorizer.assignUserToRole(
+            userPrincipal, roleJsonStr, delegated);
+        assert (roleId != null);
+
+        // Does NOT update tenant cache here
+      } catch (IOException e) {
+        // Expect the sync thread to restore the admin role later if op succeeds
+        throw new OMException(e, TENANT_AUTHORIZER_ERROR);
+      } finally {
+        tenantCacheLock.readLock().unlock();
+      }
+    }
+
+    @Override
+    public void revokeTenantAdmin(String accessId)
+        throws IOException {
+
+      checkAcquiredAuthorizerWriteLock();
+
+      tenantCacheLock.readLock().lock();
+      try {
+        // tenant name is needed to retrieve role name
+        final String tenantId = getTenantForAccessIDThrowIfNotFound(accessId);
+
+        final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+        final String tenantAdminRoleName =
+            cachedTenantState.getTenantAdminRoleName();
+        final OzoneTenantRolePrincipal existingAdminRole =
+            new OzoneTenantRolePrincipal(tenantAdminRoleName);
+
+        final String roleJsonStr = authorizer.getRole(existingAdminRole);
+        final String userPrincipal = getUserNameGivenAccessId(accessId);
+        // Update Ranger. Add user principal (not accessId!) to the role
+        final String roleId = authorizer.revokeUserFromRole(
+            userPrincipal, roleJsonStr);
+        assert (roleId != null);
+
+        // Does NOT update tenant cache here
+      } catch (IOException e) {
+        // Expect the sync thread to restore the admin role later if op succeeds
+        throw new OMException(e, TENANT_AUTHORIZER_ERROR);
+      } finally {
+        tenantCacheLock.readLock().unlock();
       }
-    } finally {
-      tenantCacheLock.writeLock().unlock();
     }
+
   }
 
   /**
-   *  Algorithm
-   *  Authorizer-plugin(Ranger) State :
-   *    - create User in Ranger DB
-   *    - For every user created
-   *        Add them to # GroupTenantAllUsers
-   *  In case of failure :
-   *    - Undo all Ranger State
-   *    - remove updates to the Map
-   *  Locking :
-   *    - Create/Manage Tenant/User operations are control path operations.
-   *      We can do all of this as part of holding a coarse lock and synchronize
-   *      these control path operations.
-   *
-   * @param principal
-   * @param tenantId
-   * @param accessId
-   * @return Tenant, or null on error
-   * @throws IOException
+   * Implements tenant cache operations.
    */
-  @Override
-  public String assignUserToTenant(BasicUserPrincipal principal,
-      String tenantId, String accessId) throws IOException {
+  public class CacheOp implements TenantOp {
 
-    final CachedAccessIdInfo cacheEntry =
-        new CachedAccessIdInfo(principal.getName(), false);
+    private final Map<String, CachedTenantState> tenantCache;
+    private final ReentrantReadWriteLock tenantCacheLock;
+
+    CacheOp(Map<String, CachedTenantState> tenantCache,
+        ReentrantReadWriteLock tenantCacheLock) {
+      this.tenantCache = tenantCache;
+      this.tenantCacheLock = tenantCacheLock;
+    }
+
+    @Override
+    public void createTenant(
+        String tenantId, String userRoleName, String adminRoleName) {
 
-    try {
       tenantCacheLock.writeLock().lock();
+      try {
+        if (tenantCache.containsKey(tenantId)) {
+          LOG.warn("Cache entry for tenant '{}' already exists, "
+              + "will be overwritten", tenantId);
+        }
 
-      CachedTenantState cachedTenantState = tenantCache.get(tenantId);
-      Preconditions.checkNotNull(cachedTenantState,
-          "Cache entry for tenant '" + tenantId + "' does not exist");
-
-      LOG.info("Adding user '{}' access ID '{}' to tenant '{}' in-memory cache",
-          principal.getName(), accessId, tenantId);
-      cachedTenantState.getAccessIdInfoMap().put(accessId, cacheEntry);
-
-      final String tenantUserRoleName =
-          tenantCache.get(tenantId).getTenantUserRoleName();
-      final OzoneTenantRolePrincipal tenantUserRolePrincipal =
-          new OzoneTenantRolePrincipal(tenantUserRoleName);
-      String roleJsonStr = authorizer.getRole(tenantUserRolePrincipal);
-      final String roleId =
-          authorizer.assignUserToRole(principal, roleJsonStr, false);
-      return roleId;
-    } catch (IOException e) {
-      // Clean up
-      revokeUserAccessId(accessId);
-      tenantCache.get(tenantId).getAccessIdInfoMap().remove(accessId);
+        // New entry in tenant cache
+        tenantCache.put(tenantId, new CachedTenantState(
+            tenantId, userRoleName, adminRoleName));
+      } finally {
+        tenantCacheLock.writeLock().unlock();
+      }
+    }
 
-      throw new OMException(e.getMessage(), TENANT_AUTHORIZER_ERROR);
-    } finally {
-      tenantCacheLock.writeLock().unlock();
+    @Override
+    public void deleteTenant(Tenant tenant) throws IOException {
+
+      final String tenantId = tenant.getTenantId();
+
+      tenantCacheLock.writeLock().lock();
+      try {
+        if (tenantCache.containsKey(tenantId)) {
+          LOG.info("Removing tenant from in-memory cache: {}", tenantId);
+          tenantCache.remove(tenantId);
+        } else {
+          throw new OMException("Tenant does not exist in cache: " + tenantId,
+              INTERNAL_ERROR);
+        }
+      } finally {
+        tenantCacheLock.writeLock().unlock();
+      }
     }
-  }
 
-  @Override
-  public void revokeUserAccessId(String accessId) throws IOException {
-    try {
+    @Override
+    public void assignUserToTenant(String userPrincipal,
+        String tenantId, String accessId) {
+
+      final CachedAccessIdInfo cacheEntry =
+          new CachedAccessIdInfo(userPrincipal, false);
+
       tenantCacheLock.writeLock().lock();
-      final OmDBAccessIdInfo omDBAccessIdInfo =
-          omMetadataManager.getTenantAccessIdTable().get(accessId);
-      if (omDBAccessIdInfo == null) {
-        throw new OMException(INVALID_ACCESS_ID);
+      try {
+        final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+        Preconditions.checkNotNull(cachedTenantState,
+            "Cache entry for tenant '" + tenantId + "' does not exist");
+
+        LOG.info("Adding to cache: user '{}' accessId '{}' in tenant '{}'",
+            userPrincipal, accessId, tenantId);
+        cachedTenantState.getAccessIdInfoMap().put(accessId, cacheEntry);
+      } finally {
+        tenantCacheLock.writeLock().unlock();
       }
-      final String tenantId = omDBAccessIdInfo.getTenantId();
-      if (tenantId == null) {
-        LOG.error("Tenant doesn't exist");
-        return;
+    }
+
+    @Override
+    public void revokeUserAccessId(String accessId, String tenantId)
+        throws IOException {
+
+      tenantCacheLock.writeLock().lock();
+      try {
+        LOG.info("Removing from cache: accessId '{}' in tenant '{}'",
+            accessId, tenantId);
+        if (!tenantCache.get(tenantId).getAccessIdInfoMap()
+            .containsKey(accessId)) {
+          throw new OMException("accessId '" + accessId + "' doesn't exist "
+              + "in tenant cache!", INTERNAL_ERROR);
+        }
+        tenantCache.get(tenantId).getAccessIdInfoMap().remove(accessId);
+      } finally {
+        tenantCacheLock.writeLock().unlock();
       }
+    }
 
-      final BasicUserPrincipal principal =
-          new BasicUserPrincipal(omDBAccessIdInfo.getUserPrincipal());
-
-      LOG.info("Removing user '{}' access ID '{}' from tenant '{}' in-memory "
-              + "cache",
-          principal.getName(), accessId, tenantId);
-      tenantCache.get(tenantId).getAccessIdInfoMap().remove(accessId);
-
-      // Delete user from role in Ranger
-      final String tenantUserRoleName =
-          tenantCache.get(tenantId).getTenantUserRoleName();
-      final OzoneTenantRolePrincipal tenantUserRolePrincipal =
-          new OzoneTenantRolePrincipal(tenantUserRoleName);
-      String roleJsonStr = authorizer.getRole(tenantUserRolePrincipal);
-      final String roleId =
-          authorizer.revokeUserFromRole(principal, roleJsonStr);
-      Preconditions.checkNotNull(roleId);
-    } finally {
-      tenantCacheLock.writeLock().unlock();
+    /**
+     * This should be called in validateAndUpdateCache after
+     * the InAuthorizer variant (called in preExecute).
+     */
+    @Override
+    public void assignTenantAdmin(String accessId, boolean delegated)
+        throws IOException {
+
+      tenantCacheLock.writeLock().lock();
+      try {
+        // tenant name is needed to retrieve role name
+        final String tenantId = getTenantForAccessIDThrowIfNotFound(accessId);
+        final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+
+        LOG.info("Updating cache: accessId '{}' isAdmin '{}' isDelegated '{}'",
+            accessId, true, delegated);
+        // Update cache. Note: tenant cache does not store delegated flag yet.
+        cachedTenantState.getAccessIdInfoMap().get(accessId).setIsAdmin(true);
+      } finally {
+        tenantCacheLock.writeLock().unlock();
+      }
     }
-  }
 
-  /**
-   * {@inheritDoc}
-   */
-  public void removeUserAccessIdFromCache(String accessId, String userPrincipal,
-                                          String tenantId) {
+    @Override
+    public void revokeTenantAdmin(String accessId) throws IOException {
 
-    tenantCacheLock.writeLock().lock();
-    try {
-      tenantCache.get(tenantId).getAccessIdInfoMap().remove(accessId);
-    } catch (NullPointerException e) {
-      // tenantCache is somehow empty. Ignore for now.
-      LOG.warn("Exception when removing accessId from cache", e);
-    } finally {
-      tenantCacheLock.writeLock().unlock();
+      tenantCacheLock.writeLock().lock();
+      try {
+        // tenant name is needed to retrieve role name
+        final String tenantId = getTenantForAccessIDThrowIfNotFound(accessId);
+
+        final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+
+        LOG.info("Updating cache: accessId '{}' isAdmin '{}' isDelegated '{}'",
+            accessId, false, false);
+        // Update cache
+        cachedTenantState.getAccessIdInfoMap().get(accessId).setIsAdmin(false);
+
+      } finally {
+        tenantCacheLock.writeLock().unlock();
+      }
     }
+
   }
 
   @Override
   public String getUserNameGivenAccessId(String accessId) {
+
     Preconditions.checkNotNull(accessId);
+
+    tenantCacheLock.readLock().lock();
     try {
-      tenantCacheLock.readLock().lock();
       OmDBAccessIdInfo omDBAccessIdInfo =
           omMetadataManager.getTenantAccessIdTable().get(accessId);
       if (omDBAccessIdInfo != null) {
@@ -550,76 +773,19 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     return Optional.of(omDBAccessIdInfo.getTenantId());
   }
 
-  @Override
-  public void assignTenantAdmin(String accessId, boolean delegated)
+  /**
+   * Internal helper method that gets tenant name from accessId.
+   * Throws if not found.
+   */
+  private String getTenantForAccessIDThrowIfNotFound(String accessId)
       throws IOException {
-    try {
-      tenantCacheLock.writeLock().lock();
-
-      // tenantId (tenant name) is necessary to retrieve role name
-      Optional<String> optionalTenant = getTenantForAccessID(accessId);
-      if (!optionalTenant.isPresent()) {
-        throw new OMException("No tenant found for access ID " + accessId,
-            INVALID_ACCESS_ID);
-      }
-      final String tenantId = optionalTenant.get();
-
-      final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
-      final String tenantAdminRoleName =
-          cachedTenantState.getTenantAdminRoleName();
-      final OzoneTenantRolePrincipal existingAdminRole =
-          new OzoneTenantRolePrincipal(tenantAdminRoleName);
-
-      final String roleJsonStr = authorizer.getRole(existingAdminRole);
-      final String userPrincipal = getUserNameGivenAccessId(accessId);
-      // Add user principal (not accessId!) to the role
-      final String roleId = authorizer.assignUserToRole(
-          new BasicUserPrincipal(userPrincipal), roleJsonStr, delegated);
-      assert (roleId != null);
-
-      // Update cache
-      cachedTenantState.getAccessIdInfoMap().get(accessId).setIsAdmin(true);
-
-    } catch (IOException e) {
-      revokeTenantAdmin(accessId);
-      throw e;
-    } finally {
-      tenantCacheLock.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public void revokeTenantAdmin(String accessId) throws IOException {
-    try {
-      tenantCacheLock.writeLock().lock();
-
-      // tenantId (tenant name) is necessary to retrieve role name
-      Optional<String> optionalTenant = getTenantForAccessID(accessId);
-      if (!optionalTenant.isPresent()) {
-        throw new OMException("No tenant found for access ID " + accessId,
-            INVALID_ACCESS_ID);
-      }
-      final String tenantId = optionalTenant.get();
-
-      final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
-      final String tenantAdminRoleName =
-          cachedTenantState.getTenantAdminRoleName();
-      final OzoneTenantRolePrincipal existingAdminRole =
-          new OzoneTenantRolePrincipal(tenantAdminRoleName);
-
-      final String roleJsonStr = authorizer.getRole(existingAdminRole);
-      final String userPrincipal = getUserNameGivenAccessId(accessId);
-      // Add user principal (not accessId!) to the role
-      final String roleId = authorizer.revokeUserFromRole(
-          new BasicUserPrincipal(userPrincipal), roleJsonStr);
-      assert (roleId != null);
 
-      // Update cache
-      cachedTenantState.getAccessIdInfoMap().get(accessId).setIsAdmin(false);
-
-    } finally {
-      tenantCacheLock.writeLock().unlock();
+    final Optional<String> optionalTenant = getTenantForAccessID(accessId);
+    if (!optionalTenant.isPresent()) {
+      throw new OMException("No tenant found for access ID: " + accessId,
+          INVALID_ACCESS_ID);
     }
+    return optionalTenant.get();
   }
 
   public AccessPolicy newDefaultVolumeAccessPolicy(String tenantId,
@@ -731,8 +897,8 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
 
     final UserGroupInformation ugi = ProtobufRpcEngine.Server.getRemoteUser();
     if (!ozoneManager.isAdmin(ugi)) {
-      throw new OMException("User '" + ugi.getUserName() +
-          "' is not an Ozone admin.",
+      throw new OMException("User '" + ugi.getUserName() + "' or '" +
+          ugi.getShortUserName() + "' is not an Ozone admin",
           OMException.ResultCodes.PERMISSION_DENIED);
     }
   }
@@ -780,17 +946,15 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
         omMetadataManager.getTenantStateTable().get(tenantId);
 
     if (tenantState == null) {
-      throw new OMException("Potential DB error or race condition. "
-          + "OmDBTenantState entry is missing for tenant '" + tenantId + "'.",
+      throw new OMException("Tenant '" + tenantId + "' does not exist",
           OMException.ResultCodes.TENANT_NOT_FOUND);
     }
 
     final String volumeName = tenantState.getBucketNamespaceName();
 
     if (volumeName == null) {
-      throw new OMException("Potential DB error. volumeName "
-          + "field is null for tenantId '" + tenantId + "'.",
-          OMException.ResultCodes.VOLUME_NOT_FOUND);
+      throw new OMException("Volume for tenant '" + tenantId +
+          "' is not set!", OMException.ResultCodes.VOLUME_NOT_FOUND);
     }
 
     return volumeName;
@@ -834,6 +998,33 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     }
   }
 
+  @Override
+  public Tenant getTenantFromDBById(String tenantId) throws IOException {
+
+    // Policy names (not cached at the moment) have to retrieved from OM DB.
+    // TODO: Store policy names in cache as well if needed.
+
+    final OmDBTenantState tenantState =
+        omMetadataManager.getTenantStateTable().get(tenantId);
+
+    if (tenantState == null) {
+      throw new OMException("Tenant '" + tenantId + "' does not exist",
+          OMException.ResultCodes.TENANT_NOT_FOUND);
+    }
+
+    final Tenant tenantObj = new OzoneTenant(tenantState.getTenantId());
+
+    tenantObj.addTenantAccessPolicy(
+        new RangerAccessPolicy(tenantState.getBucketNamespacePolicyName()));
+    tenantObj.addTenantAccessPolicy(
+        new RangerAccessPolicy(tenantState.getBucketNamespaceName()));
+
+    tenantObj.addTenantAccessRole(tenantState.getUserRoleName());
+    tenantObj.addTenantAccessRole(tenantState.getAdminRoleName());
+
+    return tenantObj;
+  }
+
   @Override
   public boolean isUserAccessIdPrincipalOrTenantAdmin(String accessId,
       UserGroupInformation ugi) throws IOException {
@@ -888,6 +1079,7 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     return tenantCache.get(tenantId).isTenantEmpty();
   }
 
+  @VisibleForTesting
   public Map<String, CachedTenantState> getTenantCache() {
     return tenantCache;
   }
@@ -949,4 +1141,9 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
       usersInTheRole.add(userPrincipal);
     }
   }
+
+  @Override
+  public AuthorizerLock getAuthorizerLock() {
+    return authorizerLock;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TenantOp.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TenantOp.java
new file mode 100644
index 0000000000..6b5c288278
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TenantOp.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.hadoop.ozone.om.multitenant.Tenant;
+
+import java.io.IOException;
+
+/**
+ * Interface for tenant operations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TenantOp {
+  /**
+   * Given a tenant name, create tenant roles and policies in the authorizer
+   * (Ranger). Then return a Tenant object.
+   *
+   * @param tenantId tenant name
+   * @param userRoleName user role name
+   * @param adminRoleName admin role name
+   */
+  void createTenant(String tenantId, String userRoleName,
+      String adminRoleName) throws IOException;
+
+  /**
+   * Given a Tenant object, remove all policies and roles from Ranger that are
+   * added during tenant creation.
+   * Note this differs from deactivateTenant() above.
+   *
+   * @param tenant tenant object
+   */
+  void deleteTenant(Tenant tenant) throws IOException;
+
+  /**
+   * Creates a new user that exists for S3 API access to Ozone.
+   *
+   * @param userPrincipal user principal
+   * @param tenantId tenant name
+   * @param accessId access ID
+   */
+  void assignUserToTenant(String userPrincipal,
+      String tenantId, String accessId) throws IOException;
+
+  /**
+   * Revoke user accessId in a tenant.
+   *
+   * @param accessId access ID
+   * @param tenantId tenant name
+   */
+  void revokeUserAccessId(String accessId, String tenantId) throws IOException;
+
+  /**
+   * Given an accessId, grant that user admin privilege in the tenant.
+   *
+   * @param accessId access ID
+   * @param delegated true if intending to assign as the user as the tenant's
+   *                  delegated admin (who can assign and revoke other tenant
+   *                  admins in this tenant)
+   */
+  void assignTenantAdmin(String accessId, boolean delegated) throws IOException;
+
+  /**
+   * Given an accessId, revoke that user's admin privilege in that tenant.
+   *
+   * @param accessId access ID
+   */
+  void revokeTenantAdmin(String accessId) throws IOException;
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/AuthorizerLock.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/AuthorizerLock.java
new file mode 100644
index 0000000000..448e9d789a
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/AuthorizerLock.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.multitenant;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl.AuthorizerOp;
+
+import java.io.IOException;
+
+/**
+ * Authorizer access lock interface. Used by OMMultiTenantManager.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface AuthorizerLock {
+  /**
+   * Attempt to acquire the read lock to the authorizer with a timeout.
+   * @return stamp returned from lock op. required when releasing the lock
+   */
+  long tryReadLock(long timeout) throws InterruptedException;
+
+  /**
+   * Release the read lock to the authorizer.
+   * Throws IllegalMonitorStateException if the provided stamp is incorrect.
+   */
+  void unlockRead(long stamp);
+
+  /**
+   * A wrapper around tryReadLock() that throws when timed out.
+   * @return stamp
+   */
+  long tryReadLockThrowOnTimeout() throws IOException;
+
+  /**
+   * Attempt to acquire the write lock to authorizer with a timeout.
+   * @return stamp
+   */
+  long tryWriteLock(long timeout) throws InterruptedException;
+
+  /**
+   * Release the write lock to the authorizer.
+   * Throws IllegalMonitorStateException if the provided stamp is incorrect.
+   */
+  void unlockWrite(long stamp);
+
+  /**
+   * A wrapper around tryWriteLock() that throws when timed out.
+   * @return stamp
+   */
+  long tryWriteLockThrowOnTimeout() throws IOException;
+
+  /**
+   * A wrapper around tryWriteLockThrowOnTimeout() that is used exclusively
+   * in OMRequests.
+   *
+   * MUST use paired with unlockWriteInOMRequest() for unlocking to ensure
+   * correctness.
+   */
+  void tryWriteLockInOMRequest() throws IOException;
+
+  /**
+   * A wrapper around unlockWrite() that is used exclusively in OMRequests.
+   */
+  void unlockWriteInOMRequest();
+
+  /**
+   * Returns true if the authorizer write lock is held by the current thread.
+   * Used in {@link AuthorizerOp}.
+   */
+  boolean isWriteLockHeldByCurrentThread();
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/AuthorizerLockImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/AuthorizerLockImpl.java
new file mode 100644
index 0000000000..1bb9b15b72
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/AuthorizerLockImpl.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.multitenant;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.StampedLock;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_TENANT_AUTHORIZER_LOCK_WAIT_MILLIS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
+
+/**
+ * Implementation of {@link AuthorizerLock}.
+ */
+public class AuthorizerLockImpl implements AuthorizerLock {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(AuthorizerLockImpl.class);
+
+  private final StampedLock authorizerStampedLock = new StampedLock();
+
+  // No need to use atomic here as both fields can only be updated after
+  // authorizer write lock is acquired.
+  private long omRequestWriteLockStamp = 0L;
+  private long omRequestWriteLockHolderTid = 0L;
+
+  @Override
+  public long tryReadLock(long timeout) throws InterruptedException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to acquire authorizer read lock from thread {}",
+          Thread.currentThread().getId());
+    }
+    return authorizerStampedLock.tryReadLock(timeout, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Release read lock on the authorizer.
+   * This is only used by BG sync at the moment.
+   */
+  @Override
+  public void unlockRead(long stamp) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Releasing authorizer read lock from thread {} with stamp {}",
+          Thread.currentThread().getId(), stamp);
+    }
+    authorizerStampedLock.unlockRead(stamp);
+  }
+
+  @Override
+  public long tryReadLockThrowOnTimeout() throws IOException {
+
+    long stamp;
+    try {
+      stamp = tryReadLock(OZONE_TENANT_AUTHORIZER_LOCK_WAIT_MILLIS);
+    } catch (InterruptedException e) {
+      throw new OMException(e, INTERNAL_ERROR);
+    }
+    if (stamp == 0L) {
+      throw new OMException("Timed out acquiring authorizer read lock."
+          + " Another multi-tenancy request is in-progress. Try again later",
+          ResultCodes.TIMEOUT);
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("Acquired authorizer read lock from thread {} with stamp {}",
+          Thread.currentThread().getId(), stamp);
+    }
+    return stamp;
+  }
+
+  @Override
+  public long tryWriteLock(long timeout) throws InterruptedException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to acquire authorizer write lock from thread {}",
+          Thread.currentThread().getId());
+    }
+    return authorizerStampedLock.tryWriteLock(timeout, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Release read lock on the authorizer.
+   * This is used by both BG sync and tenant requests.
+   */
+  @Override
+  public void unlockWrite(long stamp) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Releasing authorizer write lock from thread {} with stamp {}",
+          Thread.currentThread().getId(), stamp);
+    }
+    authorizerStampedLock.unlockWrite(stamp);
+  }
+
+  @Override
+  public long tryWriteLockThrowOnTimeout() throws IOException {
+
+    long stamp;
+    try {
+      stamp = tryWriteLock(OZONE_TENANT_AUTHORIZER_LOCK_WAIT_MILLIS);
+    } catch (InterruptedException e) {
+      throw new OMException(e, INTERNAL_ERROR);
+    }
+    if (stamp == 0L) {
+      throw new OMException("Timed out acquiring authorizer write lock. "
+          + "Another multi-tenancy request is in-progress. Try again later",
+          ResultCodes.TIMEOUT);
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("Acquired authorizer write lock from thread {} with stamp {}",
+          Thread.currentThread().getId(), stamp);
+    }
+    return stamp;
+  }
+
+  @Override
+  public void tryWriteLockInOMRequest() throws IOException {
+
+    // Sanity check. Must not have held a write lock in a tenant OMRequest.
+    Preconditions.checkArgument(omRequestWriteLockStamp == 0L);
+    Preconditions.checkArgument(omRequestWriteLockHolderTid == 0L);
+
+    long stamp = tryWriteLockThrowOnTimeout();
+    omRequestWriteLockStamp = stamp;
+    omRequestWriteLockHolderTid = Thread.currentThread().getId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Set omRequestWriteLockStamp to {}, "
+          + "omRequestWriteLockHolderTid to {}",
+          omRequestWriteLockStamp, omRequestWriteLockHolderTid);
+    }
+  }
+
+  @Override
+  public void unlockWriteInOMRequest() {
+
+    if (omRequestWriteLockStamp == 0L) {
+      LOG.debug("Authorizer write lock is not held in this lock instance. "
+          + "This OM might be follower, or leader changed. Ignored");
+      return;
+    }
+
+    final long stamp = omRequestWriteLockStamp;
+
+    // Reset the internal lock stamp record back to zero.
+    omRequestWriteLockStamp = 0L;
+    omRequestWriteLockHolderTid = 0L;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Restored omRequestWriteLockStamp to {}, "
+              + "omRequestWriteLockHolderTid to {}",
+          omRequestWriteLockStamp, omRequestWriteLockHolderTid);
+    }
+    unlockWrite(stamp);
+  }
+
+  @Override
+  public boolean isWriteLockHeldByCurrentThread() {
+
+    if (omRequestWriteLockHolderTid == 0L) {
+      LOG.debug("Write lock is not held by any OMRequest thread");
+      return false;
+    }
+
+    if (omRequestWriteLockHolderTid != Thread.currentThread().getId()) {
+      LOG.debug("Write lock is not held by current thread");
+      return false;
+    }
+
+    return true;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
index fc2d652711..b8fe50d60a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
@@ -54,42 +54,43 @@ public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
 
   /**
    * Assign user to an existing role in the Authorizer.
-   * @param principal User principal
-   * @param existingRole A JSON String representation of the existing role
-   *                     returned from the Authorizer (Ranger).
-   * @param isAdmin
+   *
+   * @param userPrincipal user principal
+   * @param existingRole  A JSON String representation of the existing role
+   *                      returned from the Authorizer (Ranger).
+   * @param isAdmin true if tenant admin
    * @return unique and opaque userID that can be used to refer to the user in
    * MultiTenantGateKeeperplugin Implementation. E.g. a Ranger
    * based Implementation can return some ID thats relevant for it.
    */
-  String assignUserToRole(BasicUserPrincipal principal, String existingRole,
+  String assignUserToRole(String userPrincipal, String existingRole,
       boolean isAdmin) throws IOException;
 
   /**
    * Update the exising role details and push the changes to Ranger.
    *
-   * @param principal contains user name, must be an existing user in Ranger.
-   * @param existingRole An existing role's JSON response String from Ranger.
+   * @param userPrincipal user name that exists in Ranger (internal / external).
+   * @param existingRole  An existing role's JSON response String from Ranger.
    * @return roleId (not useful for now)
    * @throws IOException
    */
-  String revokeUserFromRole(BasicUserPrincipal principal,
-                                   String existingRole) throws IOException;
+  String revokeUserFromRole(String userPrincipal, String existingRole)
+      throws IOException;
 
   /**
    * Assign all the users to an existing role.
    * @param users list of user principals
    * @param existingRole roleName
    */
-  String assignAllUsers(HashSet<String> users,
-                               String existingRole) throws IOException;
+  String assignAllUsers(HashSet<String> users, String existingRole)
+      throws IOException;
 
   /**
-   * @param principal
+   * @param userPrincipal
    * @return Unique userID maintained by the authorizer plugin.
    * @throws IOException
    */
-  String getUserId(BasicUserPrincipal principal) throws IOException;
+  String getUserId(String userPrincipal) throws IOException;
 
   /**
    * @param principal
@@ -143,26 +144,30 @@ public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
    * @param groupID : unique opaque ID that was returned by
    *                MultiTenantGatekeeper in createGroup().
    */
-  void deleteRole(String groupID) throws IOException;
+  void deleteRoleById(String groupID) throws IOException;
 
   /**
+   * Create access policy with the given parameters in the authorizer
+   * (e.g. Ranger).
    *
-   * @param policy
+   * @param policy AccessPolicy
    * @return unique and opaque policy ID that is maintained by the plugin.
    * @throws IOException
    */
   String createAccessPolicy(AccessPolicy policy) throws IOException;
 
   /**
+   * Get AccessPolicy by policy name.
    *
-   * @param policyName
-   * @return unique and opaque policy ID that is maintained by the plugin.
+   * @param policyName policy name
+   * @return AccessPolicy
    * @throws IOException
    */
   AccessPolicy getAccessPolicyByName(String policyName) throws IOException;
 
   /**
-   * given a policy Id, returs the policy.
+   * Given a policy Id, returns the policy.
+   *
    * @param policyId
    * @return
    * @throws IOException
@@ -170,18 +175,29 @@ public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
   AccessPolicy getAccessPolicyById(String policyId) throws IOException;
 
   /**
+   * Delete policy by policy ID.
    *
-   * @param policyId that was returned earlier by the createAccessPolicy().
+   * @param policyId ID that was returned earlier by createAccessPolicy().
    * @throws IOException
    */
   void deletePolicyById(String policyId) throws IOException;
 
   /**
+   * Delete policy by policy name.
    *
-   * @param policyName unique policyName.
+   * @param policyName policy name
    * @throws IOException
    */
   void deletePolicyByName(String policyName) throws IOException;
+
+  /**
+   * Delete role by role name.
+   *
+   * @param roleName role name
+   * @throws IOException
+   */
+  void deleteRoleByName(String roleName) throws IOException;
+
   /**
    * Grant user aclType access to bucketNameSpace.
    * @param bucketNameSpace
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
index cb6e198993..c88f332a46 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
@@ -46,14 +46,13 @@ public class MultiTenantAccessAuthorizerDummyPlugin implements
   }
 
   @Override
-  public String assignUserToRole(BasicUserPrincipal principal,
+  public String assignUserToRole(String userPrincipal,
       String existingRole, boolean isAdmin) {
     return "roleId";
   }
 
   @Override
-  public String revokeUserFromRole(BasicUserPrincipal principal,
-      String existingRole) {
+  public String revokeUserFromRole(String userPrincipal, String existingRole) {
     return "roleId";
   }
 
@@ -64,7 +63,7 @@ public class MultiTenantAccessAuthorizerDummyPlugin implements
   }
 
   @Override
-  public String getUserId(BasicUserPrincipal principal) throws IOException {
+  public String getUserId(String userPrincipal) throws IOException {
     return null;
   }
 
@@ -96,7 +95,7 @@ public class MultiTenantAccessAuthorizerDummyPlugin implements
   }
 
   @Override
-  public void deleteRole(String groupID) throws IOException {
+  public void deleteRoleById(String groupID) throws IOException {
 
   }
 
@@ -120,6 +119,11 @@ public class MultiTenantAccessAuthorizerDummyPlugin implements
 
   }
 
+  @Override
+  public void deleteRoleByName(String roleName) throws IOException {
+
+  }
+
   @Override
   public void deletePolicyByName(String policyName) throws IOException {
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
index 0e4d37b8a1..0ac783f02d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
@@ -47,7 +47,9 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.net.ConnectException;
 import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
@@ -93,7 +95,7 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
   // Stores Ranger cm_ozone service ID. This value should not change (unless
   // somehow Ranger cm_ozone service is deleted and re-created while OM is
   // still running and not reloaded / restarted).
-  private int rangerOzoneServiceId;
+  private int rangerOzoneServiceId = -1;
 
   @Override
   public void init(Configuration configuration) throws IOException {
@@ -108,13 +110,35 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
     initializeRangerConnection();
 
     // Get Ranger Ozone service ID
-    rangerOzoneServiceId = retrieveRangerOzoneServiceId();
+    try {
+      rangerOzoneServiceId = retrieveRangerOzoneServiceId();
+    } catch (SocketTimeoutException | ConnectException e) {
+      // Exceptions (e.g. ConnectException: Connection refused)
+      // thrown here can crash OM during startup.
+      // Tolerate potential connection failure to Ranger during initialization
+      // due to cluster services starting up at the same time or not ready yet.
+      // Later when the Ranger Ozone service ID would be used it should try
+      // and retrieve the ID again if it failed earlier.
+      LOG.error("Failed to get Ozone service ID to Ranger. "
+              + "Will retry later", e);
+      rangerOzoneServiceId = -1;
+    }
   }
 
   int getRangerOzoneServiceId() {
     return rangerOzoneServiceId;
   }
 
+  /**
+   * Helper method that checks if the RangerOzoneServiceId is properly retrieved
+   * during init. If not, try to get it from Ranger.
+   */
+  private void checkRangerOzoneServiceId() throws IOException {
+    if (rangerOzoneServiceId < 0) {
+      rangerOzoneServiceId = retrieveRangerOzoneServiceId();
+    }
+  }
+
   private void initializeRangerConnection() {
     setupRangerConnectionConfig();
     if (ignoreServerCert) {
@@ -257,10 +281,10 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
   }
 
   @Override
-  public String getUserId(BasicUserPrincipal principal) throws IOException {
+  public String getUserId(String userPrincipal) throws IOException {
     String rangerAdminUrl =
         rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT +
-        principal.getName();
+        userPrincipal;
 
     HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
         "GET", false);
@@ -272,7 +296,7 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
       int numIndex = userinfo.size();
       for (int i = 0; i < numIndex; ++i) {
         if (userinfo.get(i).getAsJsonObject().get("name").getAsString()
-            .equals(principal.getName())) {
+            .equals(userPrincipal)) {
           userIDCreated =
               userinfo.get(i).getAsJsonObject().get("id").getAsString();
           break;
@@ -289,13 +313,13 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
   /**
    * Update the exising role details and push the changes to Ranger.
    *
-   * @param principal contains user name, must be an existing user in Ranger.
-   * @param existingRole An existing role's JSON response String from Ranger.
+   * @param userPrincipal user name that exists in Ranger.
+   * @param existingRole  An existing role's JSON response String from Ranger.
    * @return roleId (not useful for now)
    * @throws IOException
    */
   @Override
-  public String revokeUserFromRole(BasicUserPrincipal principal,
+  public String revokeUserFromRole(String userPrincipal,
       String existingRole) throws IOException {
     JsonObject roleObj = new JsonParser().parse(existingRole).getAsJsonObject();
     // Parse Json
@@ -307,7 +331,7 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
 
     for (int i = 0; i < oldUsersArray.size(); ++i) {
       JsonObject newUserEntry = oldUsersArray.get(i).getAsJsonObject();
-      if (!newUserEntry.get("name").getAsString().equals(principal.getName())) {
+      if (!newUserEntry.get("name").getAsString().equals(userPrincipal)) {
         newUsersArray.add(newUserEntry);
       }
       // Update Json array
@@ -343,13 +367,13 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
   /**
    * Update the exising role details and push the changes to Ranger.
    *
-   * @param principal contains user name, must be an existing user in Ranger.
-   * @param existingRole An existing role's JSON response String from Ranger.
-   * @param isAdmin Make it delegated admin of the role.
+   * @param userPrincipal user name that exists in Ranger.
+   * @param existingRole  An existing role's JSON response String from Ranger.
+   * @param isAdmin       Make it delegated admin of the role.
    * @return roleId (not useful for now)
    * @throws IOException
    */
-  public String assignUserToRole(BasicUserPrincipal principal,
+  public String assignUserToRole(String userPrincipal,
       String existingRole, boolean isAdmin) throws IOException {
 
     JsonObject roleObj = new JsonParser().parse(existingRole).getAsJsonObject();
@@ -359,7 +383,7 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
 
     JsonArray usersArray = roleObj.getAsJsonArray("users");
     JsonObject newUserEntry = new JsonObject();
-    newUserEntry.addProperty("name", principal.getName());
+    newUserEntry.addProperty("name", userPrincipal);
     newUserEntry.addProperty("isAdmin", isAdmin);
     usersArray.add(newUserEntry);
     // Update Json array
@@ -608,6 +632,9 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
   }
 
   public long getLatestOzoneServiceVersion() throws IOException {
+
+    checkRangerOzoneServiceId();
+
     String rangerAdminUrl = rangerHttpsAddress
         + OZONE_OM_RANGER_OZONE_SERVICE_ENDPOINT + getRangerOzoneServiceId();
 
@@ -629,6 +656,8 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
 
   public String getAllMultiTenantPolicies() throws IOException {
 
+    checkRangerOzoneServiceId();
+
     // Note: Ranger incremental policies API is broken. So we use policy label
     // filter to get all Multi-Tenant policies.
 
@@ -670,10 +699,26 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
     }
   }
 
-  public void deleteRole(String roleName) throws IOException {
+  @Override
+  public void deleteRoleById(String roleId) throws IOException {
 
     String rangerAdminUrl =
         rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_ROLE_HTTP_ENDPOINT
+            + roleId + "?forceDelete=true";
+
+    HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
+        "DELETE", false);
+    int respnseCode = conn.getResponseCode();
+    if (respnseCode != 200 && respnseCode != 204) {
+      throw new IOException("Couldn't delete role " + roleId);
+    }
+  }
+
+  @Override
+  public void deleteRoleByName(String roleName) throws IOException {
+
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT
             + roleName + "?forceDelete=true";
 
     HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
@@ -682,12 +727,13 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
     if (respnseCode != 200 && respnseCode != 204) {
       throw new IOException("Couldn't delete role " + roleName);
     }
+
   }
 
   @Override
   public void deletePolicyByName(String policyName) throws IOException {
     AccessPolicy policy = getAccessPolicyByName(policyName);
-    String  policyID = policy.getPolicyID();
+    String policyID = policy.getPolicyID();
     LOG.debug("policyID is: {}", policyID);
     deletePolicyById(policyID);
   }
@@ -720,6 +766,7 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
         response.append(responseLine.trim());
       }
       LOG.debug("Got response: {}", response);
+      // TODO: throw if urlConnection code is 400?
     } catch (IOException e) {
       // Common exceptions:
       // 1. Server returned HTTP response code: 401
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java
index 1b32088aa4..444907c598 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.om.multitenant;
 
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.http.auth.BasicUserPrincipal;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -138,7 +137,7 @@ public interface MultiTenantAccessController {
    */
   class Role {
     private final String name;
-    private final Set<BasicUserPrincipal> users;
+    private final Set<String> users;
     private final String description;
     private final Long roleID;
 
@@ -153,7 +152,7 @@ public interface MultiTenantAccessController {
       return name;
     }
 
-    public Set<BasicUserPrincipal> getUsers() {
+    public Set<String> getUsers() {
       return users;
     }
 
@@ -197,7 +196,7 @@ public interface MultiTenantAccessController {
      */
     public static final class Builder {
       private String name;
-      private final Set<BasicUserPrincipal> users;
+      private final Set<String> users;
       private String description;
       private Long roleID;
 
@@ -217,12 +216,12 @@ public interface MultiTenantAccessController {
         return this;
       }
 
-      public Builder addUser(BasicUserPrincipal user) {
+      public Builder addUser(String user) {
         this.users.add(user);
         return this;
       }
 
-      public Builder addUsers(Collection<BasicUserPrincipal> roleUsers) {
+      public Builder addUsers(Collection<String> roleUsers) {
         this.users.addAll(roleUsers);
         return this;
       }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java
index 323a494b97..047cf6d27f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java
@@ -90,6 +90,7 @@ public class OMRangerBGSyncService extends BackgroundService {
   private final OMMetadataManager metadataManager;
   private final OMMultiTenantManager multiTenantManager;
   private final MultiTenantAccessAuthorizer authorizer;
+  private final AuthorizerLock authorizerLock;
 
   // Maximum number of attempts for each sync run
   private static final int MAX_ATTEMPT = 2;
@@ -198,14 +199,18 @@ public class OMRangerBGSyncService extends BackgroundService {
   private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
 
   public OMRangerBGSyncService(OzoneManager ozoneManager,
-      MultiTenantAccessAuthorizer authorizer, long interval,
-      TimeUnit unit, long serviceTimeout) {
+      OMMultiTenantManager omMultiTenantManager,
+      MultiTenantAccessAuthorizer authorizer,
+      long interval, TimeUnit unit, long serviceTimeout) {
 
     super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
 
     this.ozoneManager = ozoneManager;
     this.metadataManager = ozoneManager.getMetadataManager();
-    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+    // Note: ozoneManager.getMultiTenantManager() may return null because
+    // it might haven't finished initialization.
+    this.multiTenantManager = omMultiTenantManager;
+    this.authorizerLock = omMultiTenantManager.getAuthorizerLock();
 
     if (authorizer != null) {
       this.authorizer = authorizer;
@@ -248,6 +253,11 @@ public class OMRangerBGSyncService extends BackgroundService {
       // OzoneManager can be null for testing
       return true;
     }
+    if (ozoneManager.isRatisEnabled() &&
+        (ozoneManager.getOmRatisServer() == null)) {
+      LOG.warn("OzoneManagerRatisServer is not initialized yet");
+      return false;
+    }
     // The service only runs if current OM node is leader and is ready
     //  and the service is marked as started
     return isServiceStarted && ozoneManager.isLeaderReady();
@@ -264,7 +274,11 @@ public class OMRangerBGSyncService extends BackgroundService {
     public BackgroundTaskResult call() {
       // Check OM leader and readiness
       if (shouldRun()) {
-        runCount.incrementAndGet();
+        final long count = runCount.incrementAndGet();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initiating Ranger Multi-Tenancy Ranger Sync: run # {}",
+              count);
+        }
         triggerRangerSyncOnce();
       }
 
@@ -273,9 +287,7 @@ public class OMRangerBGSyncService extends BackgroundService {
   }
 
   private void triggerRangerSyncOnce() {
-    int attempt = 0;
     try {
-      // TODO: Acquire lock
       long dbOzoneServiceVersion = getOMDBRangerServiceVersion();
       long rangerOzoneServiceVersion = getLatestRangerServiceVersion();
 
@@ -298,8 +310,9 @@ public class OMRangerBGSyncService extends BackgroundService {
       // A maximum of MAX_ATTEMPT times will be attempted each time the sync
       // service is run. MAX_ATTEMPT should at least be 2 to make sure OM DB
       // has the up-to-date Ranger service version most of the times.
+      int attempt = 0;
       while (dbOzoneServiceVersion != rangerOzoneServiceVersion) {
-        // TODO: Release lock
+
         if (++attempt > MAX_ATTEMPT) {
           if (LOG.isDebugEnabled()) {
             LOG.warn("Reached maximum number of attempts ({}). Abort",
@@ -308,32 +321,28 @@ public class OMRangerBGSyncService extends BackgroundService {
           break;
         }
 
-        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}. "
-                + "Ranger service version: {}, DB version :{}",
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run # {}, attempt # {}. "
+                + "Ranger service version: {}, DB service version: {}",
             runCount.get(), attempt,
             rangerOzoneServiceVersion, dbOzoneServiceVersion);
 
         executeOMDBToRangerSync(dbOzoneServiceVersion);
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+          LOG.debug("Setting DB Ranger service version to {} (was {})",
               rangerOzoneServiceVersion, dbOzoneServiceVersion);
         }
         // Submit Ratis Request to sync the new service version in OM DB
         setOMDBRangerServiceVersion(rangerOzoneServiceVersion);
 
-        // TODO: Acquire lock
-
-        // Check Ranger ozone service version again
+        // Check Ranger Ozone service version again
         dbOzoneServiceVersion = rangerOzoneServiceVersion;
         rangerOzoneServiceVersion = getLatestRangerServiceVersion();
       }
     } catch (IOException | ServiceException e) {
       LOG.warn("Exception during Ranger Sync", e);
-      // TODO: Check specific exception once switched to
+      // TODO: Check for specific exception once switched to
       //  RangerRestMultiTenantAccessController
-//    } finally {
-//      // TODO: Release lock
     }
 
   }
@@ -394,11 +403,17 @@ public class OMRangerBGSyncService extends BackgroundService {
   private void executeOMDBToRangerSync(long baseVersion) throws IOException {
     clearPolicyAndRoleMaps();
 
-    // TODO: Acquire global lock
-    loadAllPoliciesAndRoleNamesFromRanger(baseVersion);
-    loadAllRolesFromRanger();
-    loadAllRolesFromOM();
-    // TODO: Release global lock
+
+    withReadLock(() -> {
+      try {
+        loadAllPoliciesAndRoleNamesFromRanger(baseVersion);
+        loadAllRolesFromRanger();
+        loadAllRolesFromOM();
+      } catch (IOException e) {
+        LOG.error("Failed to load policies or roles from Ranger or DB", e);
+        throw new RuntimeException(e);
+      }
+    });
 
     // This should isolate policies into two groups
     // 1. mtRangerPoliciesTobeDeleted and
@@ -430,6 +445,10 @@ public class OMRangerBGSyncService extends BackgroundService {
     String allPolicies = authorizer.getAllMultiTenantPolicies();
     JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
     JsonArray policies = jObject.getAsJsonArray("policies");
+    if (policies == null) {
+      LOG.warn("No Ranger policy received!");
+      return;
+    }
     for (int i = 0; i < policies.size(); ++i) {
       JsonObject policy = policies.get(i).getAsJsonObject();
       JsonArray policyLabels = policy.getAsJsonArray("policyLabels");
@@ -474,6 +493,7 @@ public class OMRangerBGSyncService extends BackgroundService {
         }
       }
     }
+
   }
 
   /**
@@ -523,6 +543,34 @@ public class OMRangerBGSyncService extends BackgroundService {
     }
   }
 
+  /**
+   * Helper function to run the block with read lock held.
+   */
+  private void withReadLock(Runnable block) throws IOException {
+    // Acquire authorizer (Ranger) read lock
+    long stamp = authorizerLock.tryReadLockThrowOnTimeout();
+    try {
+      block.run();
+    } finally {
+      // Release authorizer (Ranger) read lock
+      authorizerLock.unlockRead(stamp);
+    }
+  }
+
+  /**
+   * Helper function to run the block with write lock held.
+   */
+  private void withWriteLock(Runnable block) throws IOException {
+    // Acquire authorizer (Ranger) write lock
+    long stamp = authorizerLock.tryWriteLockThrowOnTimeout();
+    try {
+      block.run();
+    } finally {
+      // Release authorizer (Ranger) write lock
+      authorizerLock.unlockWrite(stamp);
+    }
+  }
+
   private void processAllPoliciesFromOMDB() throws IOException {
 
     // Iterate all DB tenant states. For each tenant,
@@ -559,7 +607,14 @@ public class OMRangerBGSyncService extends BackgroundService {
       final String policyName = entry.getKey();
       LOG.info("Deleting policy from Ranger: {}", policyName);
       checkLeader();
-      authorizer.deletePolicyByName(policyName);
+      withWriteLock(() -> {
+        try {
+          authorizer.deletePolicyByName(policyName);
+        } catch (IOException e) {
+          LOG.error("Failed to delete policy: {}", policyName, e);
+          // Proceed to delete other policies
+        }
+      });
     }
 
   }
@@ -603,8 +658,14 @@ public class OMRangerBGSyncService extends BackgroundService {
           ResultCodes.INTERNAL_ERROR);
     }
 
-    String id = authorizer.createAccessPolicy(accessPolicy);
-    LOG.info("Created policy, ID: {}", id);
+    withWriteLock(() -> {
+      try {
+        final String id = authorizer.createAccessPolicy(accessPolicy);
+        LOG.info("Created policy. Policy ID: {}", id);
+      } catch (IOException e) {
+        LOG.error("Failed to create policy: {}", accessPolicy, e);
+      }
+    });
   }
 
   private void loadAllRolesFromOM() throws IOException {
@@ -715,12 +776,14 @@ public class OMRangerBGSyncService extends BackgroundService {
         // mtRangerRoles to be populated incorrectly. In this case the roles
         // are there just fine. If not, will be corrected in the next run anyway
         checkLeader();
-        try {
-          authorizer.createRole(roleName, null);
-        } catch (IOException e) {
-          // Tolerate create role failure, possibly due to role already exists
-          LOG.error(e.getMessage());
-        }
+        withWriteLock(() -> {
+          try {
+            authorizer.createRole(roleName, null);
+          } catch (IOException e) {
+            // Tolerate create role failure, possibly due to role already exists
+            LOG.error("Failed to create role: {}", roleName, e);
+          }
+        });
         pushRoleToRanger = true;
       }
       if (pushRoleToRanger) {
@@ -741,25 +804,33 @@ public class OMRangerBGSyncService extends BackgroundService {
     for (String roleName : rolesToDelete) {
       LOG.warn("Deleting role from Ranger: {}", roleName);
       checkLeader();
-      try {
-        final String roleObj = authorizer.getRole(roleName);
-        authorizer.deleteRole(new JsonParser().parse(roleObj)
-            .getAsJsonObject().get("id").getAsString());
-      } catch (IOException e) {
-        // The role might have been deleted already.
-        // Or the role could be referenced in other roles or policies.
-        LOG.error("Failed to delete role: {}", roleName);
-        throw e;
-      }
+      withWriteLock(() -> {
+        try {
+          final String roleObj = authorizer.getRole(roleName);
+          authorizer.deleteRoleById(new JsonParser().parse(roleObj)
+              .getAsJsonObject().get("id").getAsString());
+        } catch (IOException e) {
+          // The role might have been deleted already.
+          // Or the role could be referenced in other roles or policies.
+          LOG.error("Failed to delete role: {}", roleName);
+        }
+      });
       // TODO: Server returned HTTP response code: 400
       //  if already deleted or is depended on
     }
   }
 
   private void pushOMDBRoleToRanger(String roleName) throws IOException {
-    HashSet<String> omdbUserList = mtOMDBRoles.get(roleName);
-    String roleJsonStr = authorizer.getRole(roleName);
-    authorizer.assignAllUsers(omdbUserList, roleJsonStr);
+    final HashSet<String> omDBUserList = mtOMDBRoles.get(roleName);
+    withWriteLock(() -> {
+      try {
+        String roleJsonStr = authorizer.getRole(roleName);
+        authorizer.assignAllUsers(omDBUserList, roleJsonStr);
+      } catch (IOException e) {
+        LOG.error("Failed to update role: {}, target user list: {}",
+            roleName, omDBUserList, e);
+      }
+    });
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerClientMultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerClientMultiTenantAccessController.java
index b04843789a..8dab66e695 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerClientMultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerClientMultiTenantAccessController.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.security.SecurityUtil;
-import org.apache.http.auth.BasicUserPrincipal;
 import org.apache.ranger.RangerServiceException;
 import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerRole;
@@ -186,16 +185,16 @@ public class RangerClientMultiTenantAccessController implements
   }
 
   private static List<RangerRole.RoleMember> toRangerRoleMembers(
-      Collection<BasicUserPrincipal> members) {
+      Collection<String> members) {
     return members.stream()
-            .map(princ -> new RangerRole.RoleMember(princ.getName(), false))
+            .map(princ -> new RangerRole.RoleMember(princ, false))
             .collect(Collectors.toList());
   }
 
-  private static List<BasicUserPrincipal> fromRangerRoleMembers(
+  private static List<String> fromRangerRoleMembers(
       Collection<RangerRole.RoleMember> members) {
     return members.stream()
-        .map(rangerUser -> new BasicUserPrincipal(rangerUser.getName()))
+        .map(rangerUser -> rangerUser.getName())
         .collect(Collectors.toList());
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
index 0c6db53e1a..e9ca102461 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
@@ -535,7 +535,7 @@ public class RangerRestMultiTenantAccessController
           for (JsonElement jsonUser : roleJson.get("users").getAsJsonArray()) {
             String userName =
                 jsonUser.getAsJsonObject().get("name").getAsString();
-            role.addUser(new BasicUserPrincipal(userName));
+            role.addUser(userName);
           }
 
           return role.build();
@@ -636,7 +636,7 @@ public class RangerRestMultiTenantAccessController
           jsonRole.addProperty("name", javaRole.getName());
 
           JsonArray jsonUserArray = new JsonArray();
-          for (BasicUserPrincipal javaUser : javaRole.getUsers()) {
+          for (String javaUser : javaRole.getUsers()) {
             jsonUserArray.add(jsonConverter.toJsonTree(javaUser));
           }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
index 63318f0161..81abd8df21 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
@@ -96,31 +96,10 @@ public class OMSetSecretRequest extends OMClientRequest {
           OMException.ResultCodes.INVALID_REQUEST);
     }
 
-    // TODO: Check if secretKey matches other requirements? e.g. combination
-
     final UserGroupInformation ugi = ProtobufRpcEngine.Server.getRemoteUser();
-    final String username = ugi.getUserName();
-
-    // Permission check. To pass the check, any one of the following conditions
-    // shall be satisfied:
-    // 1. username matches accessId exactly
-    // 2. user is an OM admin
-    // 3. user is assigned to a tenant under this accessId
-    // 4. user is an admin of the tenant where the accessId is assigned
-
-    if (!username.equals(accessId) && !ozoneManager.isAdmin(ugi)) {
-      // Attempt to retrieve tenant info using the accessId
-      if (!ozoneManager.getMultiTenantManager()
-          .isUserAccessIdPrincipalOrTenantAdmin(accessId, ugi)) {
-        throw new OMException("Permission denied. Requested accessId '" +
-                accessId + "' and user doesn't satisfy any of:\n" +
-                "1) accessId match current username: '" + username + "';\n" +
-                "2) is an OM admin;\n" +
-                "3) user is assigned to a tenant under this accessId;\n" +
-                "4) user is an admin of the tenant where the accessId is " +
-                "assigned", OMException.ResultCodes.PERMISSION_DENIED);
-      }
-    }
+    // Permission check
+    S3SecretRequestHelper.checkAccessIdSecretOpPermission(
+        ozoneManager, ugi, accessId);
 
     return getOmRequest();
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
index ce5fe50dca..fe595ea9ec 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
@@ -66,24 +66,21 @@ public class S3GetSecretRequest extends OMClientRequest {
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    GetS3SecretRequest s3GetSecretRequest =
+
+    final GetS3SecretRequest s3GetSecretRequest =
         getOmRequest().getGetS3SecretRequest();
 
-    // Generate S3 Secret to be used by OM quorum.
-    // Note 1: The proto field kerberosID is effectively accessId already.
-    // It is still named kerberosID because kerberosID == accessId before
-    // multi-tenancy. TODO: Rename the kerberosID field later in master branch.
-    String accessId = s3GetSecretRequest.getKerberosID();
+    // The proto field kerberosID is effectively accessId w/ Multi-Tenancy
+    //
+    // But it is still named kerberosID because kerberosID == accessId before
+    // multi-tenancy feature is implemented. And renaming proto field fails the
+    // protolock check.
+    final String accessId = s3GetSecretRequest.getKerberosID();
 
     final UserGroupInformation ugi = ProtobufRpcEngine.Server.getRemoteUser();
-    final String username = ugi.getUserName();
-    // Permission check. Users need to be themselves or have admin privilege
-    if (!username.equals(accessId) && !ozoneManager.isAdmin(ugi)) {
-      throw new OMException("Requested accessId '" + accessId +
-          "' doesn't match current user '" + username +
-          "', nor does current user has administrator privilege.",
-          OMException.ResultCodes.USER_MISMATCH);
-    }
+    // Permission check
+    S3SecretRequestHelper.checkAccessIdSecretOpPermission(
+        ozoneManager, ugi, accessId);
 
     // Client issues GetS3Secret request, when received by OM leader
     // it will generate s3Secret. Original GetS3Secret request is
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3RevokeSecretRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3RevokeSecretRequest.java
index b61e70a34a..314ab1ec4b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3RevokeSecretRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3RevokeSecretRequest.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
@@ -62,21 +61,15 @@ public class S3RevokeSecretRequest extends OMClientRequest {
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
     final RevokeS3SecretRequest s3RevokeSecretRequest =
         getOmRequest().getRevokeS3SecretRequest();
-    final String kerberosID = s3RevokeSecretRequest.getKerberosID();
+    final String accessId = s3RevokeSecretRequest.getKerberosID();
     final UserGroupInformation ugi = ProtobufRpcEngine.Server.getRemoteUser();
-    final String username = ugi.getUserName();
-    // Permission check. Users need to be themselves or have admin privilege
-    if (!username.equals(kerberosID) &&
-        !ozoneManager.isAdmin(ugi)) {
-      throw new OMException("Requested user name '" + kerberosID +
-          "' doesn't match current user '" + username +
-          "', nor does current user has administrator privilege.",
-          OMException.ResultCodes.USER_MISMATCH);
-    }
+    // Permission check
+    S3SecretRequestHelper.checkAccessIdSecretOpPermission(
+        ozoneManager, ugi, accessId);
 
     final RevokeS3SecretRequest revokeS3SecretRequest =
             RevokeS3SecretRequest.newBuilder()
-                    .setKerberosID(kerberosID).build();
+                    .setKerberosID(accessId).build();
 
     OMRequest.Builder omRequest = OMRequest.newBuilder()
         .setRevokeS3SecretRequest(revokeS3SecretRequest)
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3SecretRequestHelper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3SecretRequestHelper.java
new file mode 100644
index 0000000000..9d59272f83
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3SecretRequestHelper.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.request.s3.security;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Common helper function for S3 secret requests.
+ */
+public final class S3SecretRequestHelper {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3SecretRequestHelper.class);
+
+  private S3SecretRequestHelper() {
+  }
+
+  /**
+   * Checks whether the ugi has the permission to operate (get secret,
+   * set secret, revoke secret) on the given access ID.
+   *
+   * Throws OMException if the UGI doesn't have the permission.
+   */
+  public static void checkAccessIdSecretOpPermission(
+      OzoneManager ozoneManager, UserGroupInformation ugi, String accessId)
+      throws IOException {
+
+    final String username = ugi.getUserName();
+
+    // Flag indicating whether the accessId is assigned to a tenant
+    // (under S3 Multi-Tenancy feature) or not.
+    boolean isAccessIdAssignedToTenant = false;
+
+    // Permission check:
+    //
+    // 1. If multi-tenancy is enabled, caller ugi need to own the access ID or
+    // have Ozone admin or tenant admin privilege to pass the check;
+    if (ozoneManager.isS3MultiTenancyEnabled()) {
+      final OMMultiTenantManager multiTenantManager =
+          ozoneManager.getMultiTenantManager();
+
+      final Optional<String> optionalTenantId =
+          multiTenantManager.getTenantForAccessID(accessId);
+
+      isAccessIdAssignedToTenant = optionalTenantId.isPresent();
+
+      if (isAccessIdAssignedToTenant) {
+
+        final String accessIdOwnerUsername =
+            multiTenantManager.getUserNameGivenAccessId(accessId);
+        final String tenantId = optionalTenantId.get();
+
+        // HDDS-6691: ugi should either own the access ID, or be an Ozone/tenant
+        // admin to pass the check.
+        if (!username.equals(accessIdOwnerUsername) &&
+            !multiTenantManager.isTenantAdmin(ugi, tenantId, false)) {
+          throw new OMException("Requested accessId '" + accessId + "' doesn't"
+              + " belong to current user '" + username + "', nor does"
+              + " current user have Ozone or tenant administrator privilege",
+              ResultCodes.USER_MISMATCH);
+          // Note: A more fitting result code could be PERMISSION_DENIED,
+          //  but existing code already uses USER_MISMATCH. Maybe change this
+          //  later -- could cause minor incompatibility.
+        }
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug("S3 Multi-Tenancy is enabled, but the requested accessId "
+            + "'{}' is not assigned to a tenant. Falling back to the old "
+            + "permission check", accessId);
+      }
+    }
+
+    // 2. If S3 multi-tenancy is disabled (or the access ID is not assigned
+    // to a tenant), fall back to the old permission check.
+    if (!isAccessIdAssignedToTenant &&
+        !username.equals(accessId) && !ozoneManager.isAdmin(ugi)) {
+
+      throw new OMException("Requested accessId '" + accessId +
+          "' doesn't match current user '" + username +
+          "', nor does current user has administrator privilege.",
+          OMException.ResultCodes.USER_MISMATCH);
+    }
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
index 98553664a6..5eb27505b8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
@@ -75,7 +75,6 @@ public class OMTenantAssignAdminRequest extends OMClientRequest {
   @DisallowedUntilLayoutVersion(MULTITENANCY_SCHEMA)
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
 
-
     final OMRequest omRequest = super.preExecute(ozoneManager);
     final TenantAssignAdminRequest request =
         omRequest.getTenantAssignAdminRequest();
@@ -90,8 +89,8 @@ public class OMTenantAssignAdminRequest extends OMClientRequest {
       Optional<String> optionalTenantId =
           multiTenantManager.getTenantForAccessID(accessId);
       if (!optionalTenantId.isPresent()) {
-        throw new OMException("OmDBAccessIdInfo is missing for accessId '" +
-            accessId + "' in DB.", OMException.ResultCodes.METADATA_ERROR);
+        throw new OMException("accessId '" + accessId + "' is not assigned to "
+            + "any tenant", OMException.ResultCodes.TENANT_NOT_FOUND);
       }
       tenantId = optionalTenantId.get();
       assert (!StringUtils.isEmpty(tenantId));
@@ -124,10 +123,18 @@ public class OMTenantAssignAdminRequest extends OMClientRequest {
       delegated = true;
     }
 
-    // TODO: Acquire some lock
-    // Call OMMTM to add user to tenant admin role
-    ozoneManager.getMultiTenantManager().assignTenantAdmin(
-        request.getAccessId(), delegated);
+    // Acquire write lock to authorizer (Ranger)
+    multiTenantManager.getAuthorizerLock().tryWriteLockInOMRequest();
+    try {
+      // Add user to tenant admin role in Ranger.
+      // User principal is inferred from the accessId given.
+      // Throws if the user doesn't exist in Ranger.
+      multiTenantManager.getAuthorizerOp()
+          .assignTenantAdmin(accessId, delegated);
+    } catch (Exception e) {
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
+      throw e;
+    }
 
     final OMRequest.Builder omRequestBuilder = omRequest.toBuilder()
         .setTenantAssignAdminRequest(
@@ -140,25 +147,15 @@ public class OMTenantAssignAdminRequest extends OMClientRequest {
     return omRequestBuilder.build();
   }
 
-  @Override
-  public void handleRequestFailure(OzoneManager ozoneManager) {
-    final TenantAssignAdminRequest request =
-        getOmRequest().getTenantAssignAdminRequest();
-
-    try {
-      ozoneManager.getMultiTenantManager().revokeTenantAdmin(
-          request.getAccessId());
-    } catch (Exception e) {
-      // TODO: Ignore for now. See OMTenantCreateRequest#handleRequestFailure
-    }
-  }
-
   @Override
   @SuppressWarnings("checkstyle:methodlength")
   public OMClientResponse validateAndUpdateCache(
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
+    final OMMultiTenantManager multiTenantManager =
+        ozoneManager.getMultiTenantManager();
+
     final OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumTenantAssignAdmins();
 
@@ -211,6 +208,9 @@ public class OMTenantAssignAdminRequest extends OMClientRequest {
           new CacheValue<>(Optional.of(newOmDBAccessIdInfo),
               transactionLogIndex));
 
+      // Update tenant cache
+      multiTenantManager.getCacheOp().assignTenantAdmin(accessId, delegated);
+
       omResponse.setTenantAssignAdminResponse(
           TenantAssignAdminResponse.newBuilder()
               .build());
@@ -218,8 +218,6 @@ public class OMTenantAssignAdminRequest extends OMClientRequest {
           accessId, newOmDBAccessIdInfo);
 
     } catch (IOException ex) {
-      // Error handling
-      handleRequestFailure(ozoneManager);
       exception = ex;
       // Prepare omClientResponse
       omClientResponse = new OMTenantAssignAdminResponse(
@@ -231,7 +229,8 @@ public class OMTenantAssignAdminRequest extends OMClientRequest {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
-      // TODO: Release some lock
+      // Release authorizer write lock
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
     }
 
     // Audit
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
index 940dfdfc4f..bf8161105a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -46,7 +47,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3Secre
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantAssignUserAccessIdRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.TenantAssignUserAccessIdResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UpdateGetS3SecretRequest;
-import org.apache.http.auth.BasicUserPrincipal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +56,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeSet;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_MAXIMUM_ACCESS_ID_LENGTH;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MULTITENANCY_SCHEMA;
@@ -98,6 +99,7 @@ import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MULTITENANCY_SC
  * Handles OMAssignUserToTenantRequest.
  */
 public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
+
   public static final Logger LOG =
       LoggerFactory.getLogger(OMTenantAssignUserAccessIdRequest.class);
 
@@ -110,29 +112,40 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
 
     final OMRequest omRequest = super.preExecute(ozoneManager);
+
     final TenantAssignUserAccessIdRequest request =
         omRequest.getTenantAssignUserAccessIdRequest();
 
     final String tenantId = request.getTenantId();
 
+    final OMMultiTenantManager multiTenantManager =
+        ozoneManager.getMultiTenantManager();
+
     // Caller should be an Ozone admin, or at least a tenant non-delegated admin
-    ozoneManager.getMultiTenantManager().checkTenantAdmin(tenantId, false);
+    multiTenantManager.checkTenantAdmin(tenantId, false);
 
     final String userPrincipal = request.getUserPrincipal();
     final String accessId = request.getAccessId();
 
+    // Check accessId length.
+    if (accessId.length() >= OZONE_MAXIMUM_ACCESS_ID_LENGTH) {
+      throw new OMException(
+          "accessId length exceeds the maximum length allowed",
+          ResultCodes.INVALID_ACCESS_ID);
+    }
+
     // Check userPrincipal (username) validity.
     if (userPrincipal.contains(OzoneConsts.TENANT_ID_USERNAME_DELIMITER)) {
       throw new OMException("Invalid tenant username '" + userPrincipal +
           "'. Tenant username shouldn't contain delimiter.",
-          OMException.ResultCodes.INVALID_TENANT_USERNAME);
+          ResultCodes.INVALID_TENANT_USERNAME);
     }
 
     // Check tenant name validity.
     if (tenantId.contains(OzoneConsts.TENANT_ID_USERNAME_DELIMITER)) {
       throw new OMException("Invalid tenant name '" + tenantId +
           "'. Tenant name shouldn't contain delimiter.",
-          OMException.ResultCodes.INVALID_TENANT_ID);
+          ResultCodes.INVALID_TENANT_ID);
     }
 
     // HDDS-6366: Disallow specifying custom accessId.
@@ -142,33 +155,35 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
       throw new OMException("Invalid accessId '" + accessId + "'. "
           + "Specifying a custom access ID disallowed. "
           + "Expected accessId to be assigned is '" + expectedAccessId + "'",
-          OMException.ResultCodes.INVALID_ACCESS_ID);
+          ResultCodes.INVALID_ACCESS_ID);
     }
 
-    ozoneManager.getMultiTenantManager().checkTenantExistence(tenantId);
+    multiTenantManager.checkTenantExistence(tenantId);
 
     // Below call implies user existence check in authorizer.
     // If the user doesn't exist, Ranger return 400 and the call should throw.
 
-    // TODO: Acquire some lock
-    // Call OMMTM
-    // Inform MultiTenantManager of user assignment so it could
-    //  initialize some policies in Ranger.
-    final String roleId = ozoneManager.getMultiTenantManager()
-        .assignUserToTenant(new BasicUserPrincipal(userPrincipal), tenantId,
-            accessId);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("roleId that the user is assigned to: {}", roleId);
+    // Acquire write lock to authorizer (Ranger)
+    multiTenantManager.getAuthorizerLock().tryWriteLockInOMRequest();
+    try {
+      // Add user to tenant user role in Ranger.
+      // Throws if the user doesn't exist in Ranger.
+      multiTenantManager.getAuthorizerOp()
+          .assignUserToTenant(userPrincipal, tenantId, accessId);
+    } catch (Exception e) {
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
+      throw e;
     }
 
-    // Generate secret. Used only when doesn't the kerberosID entry doesn't
-    //  exist in DB, discarded otherwise.
+    // Generate secret. However, this is used only when the accessId entry
+    // doesn't exist in DB and need to be created, discarded otherwise.
     final String s3Secret = DigestUtils.sha256Hex(OmUtils.getSHADigest());
 
     final UpdateGetS3SecretRequest updateGetS3SecretRequest =
         UpdateGetS3SecretRequest.newBuilder()
+            .setKerberosID(accessId)
             .setAwsSecret(s3Secret)
-            .setKerberosID(accessId).build();
+            .build();
 
     final OMRequest.Builder omRequestBuilder = omRequest.toBuilder()
         .setUpdateGetS3SecretRequest(updateGetS3SecretRequest);
@@ -176,34 +191,15 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
     return omRequestBuilder.build();
   }
 
-  @Override
-  public void handleRequestFailure(OzoneManager ozoneManager) {
-    final TenantAssignUserAccessIdRequest request =
-        getOmRequest().getTenantAssignUserAccessIdRequest();
-
-    try {
-      // Undo Authorizer states established in preExecute
-      ozoneManager.getMultiTenantManager().revokeUserAccessId(
-          request.getAccessId());
-    } catch (IOException ioEx) {
-      final String userPrincipal = request.getUserPrincipal();
-      final String tenantId = request.getTenantId();
-      final String accessId = request.getAccessId();
-      ozoneManager.getMultiTenantManager().removeUserAccessIdFromCache(
-          accessId, userPrincipal, tenantId);
-    } catch (Exception e) {
-      // TODO: Ignore for now. See OMTenantCreateRequest#handleRequestFailure
-      // TODO: Temporary solution for remnant tenantCache entry. Might becomes
-      //  useless with Ranger thread impl. Can remove.
-    }
-  }
-
   @Override
   @SuppressWarnings("checkstyle:methodlength")
   public OMClientResponse validateAndUpdateCache(
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
+    final OMMultiTenantManager multiTenantManager =
+        ozoneManager.getMultiTenantManager();
+
     final OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumTenantAssignUsers();
 
@@ -243,14 +239,14 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
       if (!omMetadataManager.getTenantStateTable().isExist(tenantId)) {
         LOG.error("tenant {} doesn't exist", tenantId);
         throw new OMException("tenant '" + tenantId + "' doesn't exist",
-            OMException.ResultCodes.TENANT_NOT_FOUND);
+            ResultCodes.TENANT_NOT_FOUND);
       }
 
       // Expect accessId absence from tenantAccessIdTable
       if (omMetadataManager.getTenantAccessIdTable().isExist(accessId)) {
         LOG.error("accessId {} already exists", accessId);
         throw new OMException("accessId '" + accessId + "' already exists!",
-            OMException.ResultCodes.TENANT_USER_ACCESS_ID_ALREADY_EXISTS);
+            ResultCodes.TENANT_USER_ACCESS_ID_ALREADY_EXISTS);
       }
 
       OmDBUserPrincipalInfo principalInfo = omMetadataManager
@@ -272,7 +268,7 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
                 + "to the same tenant more than once. User '" + userPrincipal
                 + "' is already assigned to tenant '" + tenantId + "' with "
                 + "accessId '" + existingAccId + "'.",
-                OMException.ResultCodes.TENANT_USER_ACCESS_ID_ALREADY_EXISTS);
+                ResultCodes.TENANT_USER_ACCESS_ID_ALREADY_EXISTS);
           }
         }
       }
@@ -313,7 +309,7 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
         LOG.error("accessId '{}' already exists in S3SecretTable", accessId);
         throw new OMException("accessId '" + accessId +
             "' already exists in S3SecretTable",
-            OMException.ResultCodes.TENANT_USER_ACCESS_ID_ALREADY_EXISTS);
+            ResultCodes.TENANT_USER_ACCESS_ID_ALREADY_EXISTS);
       }
 
       omMetadataManager.getS3SecretTable().addCacheEntry(
@@ -323,6 +319,10 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
       omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
       acquiredS3SecretLock = false;
 
+      // Update tenant cache
+      multiTenantManager.getCacheOp()
+          .assignUserToTenant(userPrincipal, tenantId, accessId);
+
       // Generate response
       omResponse.setTenantAssignUserAccessIdResponse(
           TenantAssignUserAccessIdResponse.newBuilder()
@@ -333,7 +333,6 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
           omResponse.build(), s3SecretValue, userPrincipal,
           accessId, omDBAccessIdInfo, principalInfo);
     } catch (IOException ex) {
-      handleRequestFailure(ozoneManager);
       exception = ex;
       omResponse.setTenantAssignUserAccessIdResponse(
           TenantAssignUserAccessIdResponse.newBuilder().build());
@@ -349,7 +348,8 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
-      // TODO: Release some lock
+      // Release authorizer write lock
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
     }
 
     // Audit
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
index a33e74bd1a..b0e509bfc3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.multitenant.Tenant;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
@@ -105,8 +104,6 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
   public static final Logger LOG =
       LoggerFactory.getLogger(OMTenantCreateRequest.class);
 
-  private transient Tenant tenantInContext;
-
   public OMTenantCreateRequest(OMRequest omRequest) {
     super(omRequest);
   }
@@ -115,8 +112,11 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
   @DisallowedUntilLayoutVersion(MULTITENANCY_SCHEMA)
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
 
+    final OMMultiTenantManager multiTenantManager =
+        ozoneManager.getMultiTenantManager();
+
     // Check Ozone cluster admin privilege
-    ozoneManager.getMultiTenantManager().checkAdmin();
+    multiTenantManager.checkAdmin();
 
     final OMRequest omRequest = super.preExecute(ozoneManager);
     final CreateTenantRequest request = omRequest.getCreateTenantRequest();
@@ -171,12 +171,18 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
     final String adminRoleName =
         OMMultiTenantManager.getDefaultAdminRoleName(tenantId);
 
-    // TODO: Acquire some lock
-
-    // If we fail after pre-execute. handleRequestFailure() callback
-    // would clean up any state maintained by the getMultiTenantManager.
-    tenantInContext = ozoneManager.getMultiTenantManager()
-        .createTenantAccessInAuthorizer(tenantId, userRoleName, adminRoleName);
+    // Acquire write lock to authorizer (Ranger)
+    multiTenantManager.getAuthorizerLock().tryWriteLockInOMRequest();
+    try {
+      // Create tenant roles and policies in Ranger.
+      // If the request fails for some reason, Ranger background sync thread
+      // should clean up any leftover policies and roles.
+      multiTenantManager.getAuthorizerOp().createTenant(
+          tenantId, userRoleName, adminRoleName);
+    } catch (Exception e) {
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
+      throw e;
+    }
 
     final OMRequest.Builder omRequestBuilder = omRequest.toBuilder()
         .setCreateTenantRequest(
@@ -192,27 +198,15 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
     return omRequestBuilder.build();
   }
 
-  @Override
-  public void handleRequestFailure(OzoneManager ozoneManager) {
-    try {
-      // Cleanup any state maintained by OMMultiTenantManager
-      if (tenantInContext != null) {
-        ozoneManager.getMultiTenantManager()
-            .removeTenantAccessFromAuthorizer(tenantInContext);
-      }
-    } catch (Exception e) {
-      // TODO: Ignore for now. Multi-Tenant Manager is responsible for
-      //  cleaning up stale state eventually. The Caller is already calling
-      //  this in a failure context and would throw exception anyway.
-    }
-  }
-
   @Override
   @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
+    final OMMultiTenantManager multiTenantManager =
+        ozoneManager.getMultiTenantManager();
+
     final OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumTenantCreates();
     omMetrics.incNumVolumeCreates();
@@ -254,7 +248,7 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
           VOLUME_LOCK, volumeName);
 
       // Check volume existence
-      if (omMetadataManager.getVolumeTable().isExist(volumeName)) {
+      if (omMetadataManager.getVolumeTable().isExist(dbVolumeKey)) {
         LOG.debug("volume: '{}' already exists", volumeName);
         throw new OMException("Volume already exists", VOLUME_ALREADY_EXISTS);
       }
@@ -307,6 +301,10 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
           new CacheKey<>(tenantId),
           new CacheValue<>(Optional.of(omDBTenantState), transactionLogIndex));
 
+      // Update tenant cache
+      multiTenantManager.getCacheOp().createTenant(
+          tenantId, userRoleName, adminRoleName);
+
       omResponse.setCreateTenantResponse(
           CreateTenantResponse.newBuilder()
               .build());
@@ -314,22 +312,6 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
           omVolumeArgs, volumeList, omDBTenantState);
 
     } catch (IOException ex) {
-      // Error handling. Clean up Ranger policies when necessary.
-      if (ex instanceof OMException) {
-        final OMException omEx = (OMException) ex;
-        if (!omEx.getResult().equals(VOLUME_ALREADY_EXISTS) &&
-            !omEx.getResult().equals(TENANT_ALREADY_EXISTS)) {
-          handleRequestFailure(ozoneManager);
-        }
-        // Do NOT perform any clean-up if the exception is a result of
-        //  volume name or tenant name already existing.
-        //  Otherwise in a race condition a late-comer could wipe the
-        //  policies of an existing tenant from Ranger.
-      } else {
-        // ALL OMs should proactively call the clean-up handler in other cases
-        handleRequestFailure(ozoneManager);
-      }
-      // Prepare omClientResponse
       omClientResponse = new OMTenantCreateResponse(
           createErrorOMResponse(omResponse, ex));
       exception = ex;
@@ -342,7 +324,8 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
       if (acquiredVolumeLock) {
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
-      // TODO: Release some lock
+      // Release authorizer write lock
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
     }
 
     // Perform audit logging
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
index 6c09c6a192..f70c8c5cb1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
@@ -26,10 +26,13 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.multitenant.OzoneTenant;
+import org.apache.hadoop.ozone.om.multitenant.Tenant;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
@@ -69,13 +72,33 @@ public class OMTenantDeleteRequest extends OMVolumeRequest {
   @DisallowedUntilLayoutVersion(MULTITENANCY_SCHEMA)
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
 
+    final OMRequest omRequest = super.preExecute(ozoneManager);
+
+    final OMMultiTenantManager multiTenantManager =
+        ozoneManager.getMultiTenantManager();
+
     // Check Ozone cluster admin privilege
-    ozoneManager.getMultiTenantManager().checkAdmin();
+    multiTenantManager.checkAdmin();
+
+    // First get tenant name
+    final String tenantId = omRequest.getDeleteTenantRequest().getTenantId();
+    Preconditions.checkNotNull(tenantId);
 
-    // TODO: Acquire some lock
-    // TODO: TBD: Call ozoneManager.getMultiTenantManager().deleteTenant() ?
+    // Get tenant object by tenant name
+    final Tenant tenantObj = multiTenantManager.getTenantFromDBById(tenantId);
 
-    return super.preExecute(ozoneManager);
+    // Acquire write lock to authorizer (Ranger)
+    multiTenantManager.getAuthorizerLock().tryWriteLockInOMRequest();
+    try {
+      // Remove policies and roles from Ranger
+      // TODO: Deactivate (disable) policies instead of delete?
+      multiTenantManager.getAuthorizerOp().deleteTenant(tenantObj);
+    } catch (Exception e) {
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
+      throw e;
+    }
+
+    return omRequest;
   }
 
   @Override
@@ -84,6 +107,9 @@ public class OMTenantDeleteRequest extends OMVolumeRequest {
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
+    final OMMultiTenantManager multiTenantManager =
+        ozoneManager.getMultiTenantManager();
+
     final OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumTenantDeletes();
 
@@ -162,8 +188,11 @@ public class OMTenantDeleteRequest extends OMVolumeRequest {
         // TODO: Set response dbVolumeKey?
       }
 
-      // Compose response
+      // Update tenant cache
+      multiTenantManager.getCacheOp().deleteTenant(new OzoneTenant(tenantId));
 
+      // Compose response
+      //
       // If decVolumeRefCount is false, return -1 to the client, otherwise
       // return the actual volume refCount. Note if the actual volume refCount
       // becomes negative somehow, omVolumeArgs.decRefCount() would have thrown
@@ -188,7 +217,8 @@ public class OMTenantDeleteRequest extends OMVolumeRequest {
       if (acquiredVolumeLock) {
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
-      // TODO: Release some lock
+      // Release authorizer write lock
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
     }
 
     // Perform audit logging
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
index 0b3d974ddb..0f92630ceb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
@@ -89,8 +89,8 @@ public class OMTenantRevokeAdminRequest extends OMClientRequest {
       Optional<String> optionalTenantId =
           multiTenantManager.getTenantForAccessID(accessId);
       if (!optionalTenantId.isPresent()) {
-        throw new OMException("OmDBAccessIdInfo is missing for accessId '" +
-            accessId + "' in DB.", OMException.ResultCodes.METADATA_ERROR);
+        throw new OMException("accessId '" + accessId + "' is not assigned to "
+            + "any tenant", OMException.ResultCodes.TENANT_NOT_FOUND);
       }
       tenantId = optionalTenantId.get();
       assert (!StringUtils.isEmpty(tenantId));
@@ -101,7 +101,7 @@ public class OMTenantRevokeAdminRequest extends OMClientRequest {
     // Caller should be an Ozone admin, or a tenant delegated admin
     multiTenantManager.checkTenantAdmin(tenantId, true);
 
-    OmDBAccessIdInfo accessIdInfo = ozoneManager.getMetadataManager()
+    final OmDBAccessIdInfo accessIdInfo = ozoneManager.getMetadataManager()
         .getTenantAccessIdTable().get(accessId);
 
     if (accessIdInfo == null) {
@@ -116,9 +116,16 @@ public class OMTenantRevokeAdminRequest extends OMClientRequest {
           OMException.ResultCodes.INVALID_TENANT_ID);
     }
 
-    // TODO: Acquire some lock
-    // Remove user (inferred from access ID) from tenant admin role in Ranger
-    ozoneManager.getMultiTenantManager().revokeTenantAdmin(accessId);
+    // Acquire write lock to authorizer (Ranger)
+    multiTenantManager.getAuthorizerLock().tryWriteLockInOMRequest();
+    try {
+      // Add user to tenant admin role in Ranger.
+      // User principal is inferred from the accessId given.
+      multiTenantManager.getAuthorizerOp().revokeTenantAdmin(accessId);
+    } catch (Exception e) {
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
+      throw e;
+    }
 
     final OMRequest.Builder omRequestBuilder = omRequest.toBuilder()
         .setTenantRevokeAdminRequest(
@@ -137,6 +144,9 @@ public class OMTenantRevokeAdminRequest extends OMClientRequest {
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
+    final OMMultiTenantManager multiTenantManager =
+        ozoneManager.getMultiTenantManager();
+
     final OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumTenantRevokeAdmins();
 
@@ -188,6 +198,9 @@ public class OMTenantRevokeAdminRequest extends OMClientRequest {
           new CacheValue<>(Optional.of(newOmDBAccessIdInfo),
               transactionLogIndex));
 
+      // Update tenant cache
+      multiTenantManager.getCacheOp().revokeTenantAdmin(accessId);
+
       omResponse.setTenantRevokeAdminResponse(
           TenantRevokeAdminResponse.newBuilder()
               .build());
@@ -206,7 +219,8 @@ public class OMTenantRevokeAdminRequest extends OMClientRequest {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
-      // TODO: Release some lock
+      // Release authorizer write lock
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
     }
 
     // Audit
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
index 46f8b0ec69..354d6acb32 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
@@ -107,8 +107,8 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
       Optional<String> optionalTenantId =
           multiTenantManager.getTenantForAccessID(accessId);
       if (!optionalTenantId.isPresent()) {
-        throw new OMException("OmDBAccessIdInfo is missing for accessId '" +
-            accessId + "' in DB.", OMException.ResultCodes.METADATA_ERROR);
+        throw new OMException("accessId '" + accessId + "' is not assigned to "
+            + "any tenant", ResultCodes.TENANT_NOT_FOUND);
       }
       tenantId = optionalTenantId.get();
       assert (!StringUtils.isEmpty(tenantId));
@@ -120,6 +120,7 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
     // Caller should be an Ozone admin, or at least a tenant non-delegated admin
     multiTenantManager.checkTenantAdmin(tenantId, false);
 
+    // Prevent a tenant admin from being revoked user access
     if (accessIdInfo.getIsAdmin()) {
       throw new OMException("accessId '" + accessId + "' is a tenant admin of "
           + "tenant'" + tenantId + "'. Please revoke its tenant admin "
@@ -127,9 +128,17 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
           ResultCodes.PERMISSION_DENIED);
     }
 
-    // TODO: Acquire some lock
-    // Call OMMTM to revoke user access to tenant
-    ozoneManager.getMultiTenantManager().revokeUserAccessId(accessId);
+    // Acquire write lock to authorizer (Ranger)
+    multiTenantManager.getAuthorizerLock().tryWriteLockInOMRequest();
+    try {
+      // Remove user from tenant user role in Ranger.
+      // User principal is inferred from the accessId given.
+      multiTenantManager.getAuthorizerOp()
+          .revokeUserAccessId(accessId, tenantId);
+    } catch (Exception e) {
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
+      throw e;
+    }
 
     final Builder omRequestBuilder = omRequest.toBuilder()
         .setTenantRevokeUserAccessIdRequest(
@@ -146,6 +155,9 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
 
+    final OMMultiTenantManager multiTenantManager =
+        ozoneManager.getMultiTenantManager();
+
     final OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumTenantRevokeUsers();
 
@@ -179,12 +191,12 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
       // Remove accessId from principalToAccessIdsTable
       OmDBAccessIdInfo omDBAccessIdInfo =
           omMetadataManager.getTenantAccessIdTable().get(accessId);
-      assert (omDBAccessIdInfo != null);
+      Preconditions.checkNotNull(omDBAccessIdInfo);
       userPrincipal = omDBAccessIdInfo.getUserPrincipal();
-      assert (userPrincipal != null);
+      Preconditions.checkNotNull(userPrincipal);
       OmDBUserPrincipalInfo principalInfo = omMetadataManager
           .getPrincipalToAccessIdsTable().getIfExist(userPrincipal);
-      assert (principalInfo != null);
+      Preconditions.checkNotNull(principalInfo);
       principalInfo.removeAccessId(accessId);
       omMetadataManager.getPrincipalToAccessIdsTable().addCacheEntry(
           new CacheKey<>(userPrincipal),
@@ -207,12 +219,16 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
           new CacheKey<>(accessId),
           new CacheValue<>(Optional.absent(), transactionLogIndex));
 
+      // Update tenant cache
+      multiTenantManager.getCacheOp().revokeUserAccessId(accessId, tenantId);
+
       // Generate response
       omResponse.setTenantRevokeUserAccessIdResponse(
           TenantRevokeUserAccessIdResponse.newBuilder()
               .build());
       omClientResponse = new OMTenantRevokeUserAccessIdResponse(
           omResponse.build(), accessId, userPrincipal, principalInfo);
+
     } catch (IOException ex) {
       exception = ex;
       // Prepare omClientResponse
@@ -228,7 +244,8 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
-      // TODO: Release some lock
+      // Release authorizer write lock
+      multiTenantManager.getAuthorizerLock().unlockWriteInOMRequest();
     }
 
     // Audit
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestAuthorizerLockImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestAuthorizerLockImpl.java
new file mode 100644
index 0000000000..502537cb2f
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestAuthorizerLockImpl.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.ozone.om.multitenant.AuthorizerLock;
+import org.apache.hadoop.ozone.om.multitenant.AuthorizerLockImpl;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+
+/**
+ * Tests {@link AuthorizerLockImpl}.
+ */
+public class TestAuthorizerLockImpl {
+
+  @BeforeClass
+  public static void init() {
+    // Enable debug logging for the test
+    GenericTestUtils.setLogLevel(AuthorizerLockImpl.LOG, Level.DEBUG);
+  }
+
+  /**
+   * Tests StampedLock behavior.
+   */
+  @Test
+  @SuppressFBWarnings("IMSE_DONT_CATCH_IMSE")
+  public void testStampedLockBehavior() throws InterruptedException {
+
+    final AuthorizerLock authorizerLock = new AuthorizerLockImpl();
+
+    // Case 1: A correct stamp can unlock without an issue
+    long readLockStamp = authorizerLock.tryReadLock(100);
+    authorizerLock.unlockRead(readLockStamp);
+    long writeLockStamp = authorizerLock.tryWriteLock(100);
+    authorizerLock.unlockWrite(writeLockStamp);
+
+    // Case 1: An incorrect stamp won't be able to unlock, throws IMSE
+    readLockStamp = authorizerLock.tryReadLock(100);
+    try {
+      authorizerLock.unlockRead(readLockStamp - 1L);
+      Assert.fail("Should have thrown IllegalMonitorStateException");
+    } catch (IllegalMonitorStateException ignored) {
+    }
+    authorizerLock.unlockRead(readLockStamp);
+    writeLockStamp = authorizerLock.tryWriteLock(100);
+    try {
+      authorizerLock.unlockWrite(writeLockStamp - 1L);
+      Assert.fail("Should have thrown IllegalMonitorStateException");
+    } catch (IllegalMonitorStateException ignored) {
+    }
+    authorizerLock.unlockWrite(writeLockStamp);
+
+    // Case 2: Read lock is reentrant; Write lock is exclusive
+    long readLockStamp1 = authorizerLock.tryReadLock(100);
+    Assert.assertTrue(readLockStamp1 > 0L);
+    long readLockStamp2 = authorizerLock.tryReadLock(100);
+    Assert.assertTrue(readLockStamp2 > 0L);
+
+    // Can't acquire write lock now, as read lock has been held
+    long writeLockStamp1 = authorizerLock.tryWriteLock(100);
+    // stamp == 0L means lock failure
+    Assert.assertEquals(0L, writeLockStamp1);
+
+    // Release one read lock. Try again. Should fail
+    authorizerLock.unlockRead(readLockStamp2);
+    writeLockStamp1 = authorizerLock.tryWriteLock(100);
+    Assert.assertEquals(0L, writeLockStamp1);
+
+    // Release the other read lock. And again. Should work
+    authorizerLock.unlockRead(readLockStamp1);
+    writeLockStamp1 = authorizerLock.tryWriteLock(100);
+    Assert.assertTrue(writeLockStamp1 > 0L);
+
+    // But a second write lock won't work
+    long writeLockStamp2 = authorizerLock.tryWriteLock(100);
+    Assert.assertEquals(0L, writeLockStamp2);
+
+    // Read lock also won't work now that write lock is held
+    readLockStamp = authorizerLock.tryReadLock(100);
+    Assert.assertEquals(0L, readLockStamp);
+
+    authorizerLock.unlockWrite(writeLockStamp1);
+  }
+
+  @Test
+  public void testLockInOneThreadUnlockInAnother() {
+
+    final AuthorizerLock authorizerLock = new AuthorizerLockImpl();
+
+    try {
+      authorizerLock.tryWriteLockInOMRequest();
+
+      // Spawn another thread to release the lock.
+      // Works as long as they share the same AuthorizerLockImpl instance.
+      final Thread thread1 = new Thread(authorizerLock::unlockWriteInOMRequest);
+      thread1.start();
+    } catch (IOException e) {
+      Assert.fail("Should not have thrown: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testUnlockWriteInOMRequestShouldNotThrowOnFollowerOMs() {
+
+    final AuthorizerLock authorizerLock = new AuthorizerLockImpl();
+
+    // When a follower OM attempts to unlock write in validateAndUpdateCache,
+    // even though it hasn't acquired the lock in preExecute,
+    // the unlockWriteInOMRequest() method should not throw
+    // IllegalMonitorStateException as it should have been handled gracefully.
+    authorizerLock.unlockWriteInOMRequest();
+  }
+
+  @Test
+  public void testIsWriteLockHeldByCurrentThread() throws IOException {
+
+    final AuthorizerLock authorizerLock = new AuthorizerLockImpl();
+
+    Assert.assertFalse(authorizerLock.isWriteLockHeldByCurrentThread());
+
+    // Read lock does not affect the check
+    long readLockStamp = authorizerLock.tryReadLockThrowOnTimeout();
+    Assert.assertFalse(authorizerLock.isWriteLockHeldByCurrentThread());
+    authorizerLock.unlockRead(readLockStamp);
+
+    // Only a write lock acquired through InOMRequest variant affects the check
+    authorizerLock.tryWriteLockInOMRequest();
+    Assert.assertTrue(authorizerLock.isWriteLockHeldByCurrentThread());
+    authorizerLock.unlockWriteInOMRequest();
+
+    // Regular write lock does not affect the check as well
+    long writeLockStamp = authorizerLock.tryWriteLockThrowOnTimeout();
+    Assert.assertFalse(authorizerLock.isWriteLockHeldByCurrentThread());
+    authorizerLock.unlockWrite(writeLockStamp);
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManagerImpl.java
index 3370673315..d28d1cdc55 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManagerImpl.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
 import org.apache.hadoop.ozone.om.helpers.TenantUserList;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserAccessIdInfo;
-import org.apache.http.auth.BasicUserPrincipal;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -106,8 +105,8 @@ public class TestOMMultiTenantManagerImpl {
 
   @Test
   public void testListUsersInTenant() throws Exception {
-    tenantManager.assignUserToTenant(
-        new BasicUserPrincipal("user1"), TENANT_ID, "accessId1");
+    tenantManager.getCacheOp()
+        .assignUserToTenant("user1", TENANT_ID, "accessId1");
 
     TenantUserList tenantUserList =
         tenantManager.listUsersInTenant(TENANT_ID, "");
@@ -138,10 +137,12 @@ public class TestOMMultiTenantManagerImpl {
   public void testRevokeUserAccessId() throws Exception {
 
     LambdaTestUtils.intercept(OMException.class, () ->
-        tenantManager.revokeUserAccessId("accessId1"));
+        tenantManager.getCacheOp()
+            .revokeUserAccessId("unknown-AccessId1", TENANT_ID));
     assertEquals(1, tenantManager.getTenantCache().size());
 
-    tenantManager.revokeUserAccessId("seed-accessId1");
+    tenantManager.getCacheOp()
+        .revokeUserAccessId("seed-accessId1", TENANT_ID);
     assertTrue(tenantManager.getTenantCache().get(TENANT_ID)
         .getAccessIdInfoMap().isEmpty());
     assertTrue(tenantManager.listUsersInTenant(TENANT_ID, null)
@@ -149,7 +150,7 @@ public class TestOMMultiTenantManagerImpl {
   }
 
   @Test
-  public void testGetTenantForAccessID() throws Exception {
+  public void testGetTenantForAccessId() throws Exception {
     Optional<String> optionalTenant = tenantManager.getTenantForAccessID(
         "seed-accessId1");
     assertTrue(optionalTenant.isPresent());
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessController.java
index 8e0e871fca..d4be749d51 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessController.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Acl;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
-import org.apache.http.auth.BasicUserPrincipal;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,14 +40,14 @@ import java.util.stream.Collectors;
  */
 public class TestMultiTenantAccessController {
   private MultiTenantAccessController controller;
-  private List<BasicUserPrincipal> users;
+  private List<String> users;
 
   @Before
   public void setupUsers() {
     // If testing against a real cluster, users must already be added to Ranger.
     users = new ArrayList<>();
-    users.add(new BasicUserPrincipal("om"));
-    users.add(new BasicUserPrincipal("hdfs"));
+    users.add("om");
+    users.add("hdfs");
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
index e23b8f6886..b8cf51979e 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
@@ -23,12 +23,14 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.om.multitenant.AuthorizerLockImpl;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.TenantOp;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
@@ -136,10 +138,12 @@ public class TestS3GetSecretRequest {
     when(ozoneManager.getMultiTenantManager()).thenReturn(omMultiTenantManager);
 
     when(tenant.getTenantAccessPolicies()).thenReturn(new ArrayList<>());
-    when(omMultiTenantManager.createTenantAccessInAuthorizer(TENANT_ID,
-            OMMultiTenantManager.getDefaultUserRoleName(TENANT_ID),
-            OMMultiTenantManager.getDefaultAdminRoleName(TENANT_ID)))
-        .thenReturn(tenant);
+    when(omMultiTenantManager.getAuthorizerLock())
+        .thenReturn(new AuthorizerLockImpl());
+    TenantOp authorizerOp = mock(TenantOp.class);
+    TenantOp cacheOp = mock(TenantOp.class);
+    when(omMultiTenantManager.getAuthorizerOp()).thenReturn(authorizerOp);
+    when(omMultiTenantManager.getCacheOp()).thenReturn(cacheOp);
   }
 
   @After
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantAssignAdminHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantAssignAdminHandler.java
index 1ec0096865..364fd21233 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantAssignAdminHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/tenant/TenantAssignAdminHandler.java
@@ -43,8 +43,9 @@ public class TenantAssignAdminHandler extends TenantHandler {
       description = "Tenant name")
   private String tenantId;
 
-  @CommandLine.Option(names = {"-d", "--delegated"}, defaultValue = "true",
-      description = "Set to true (default) to assign delegated admin")
+  @CommandLine.Option(names = {"-d", "--delegated"}, defaultValue = "false",
+      description = "Assign delegated admin. If unspecified, assign "
+          + "non-delegated admin (the default)")
   private boolean delegated;
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org